From a85a5124eac25785a3fd9e3ec936105a169dcbfc Mon Sep 17 00:00:00 2001 From: Justin Heesemann Date: Tue, 8 Aug 2023 15:10:52 +0200 Subject: [PATCH 1/2] Azure Blob Storage Syncing. Tested with AWS source and Azure Target on 10k files --- cli/cli.go | 16 ++-- cli/setup.go | 5 +- go.mod | 8 +- go.sum | 29 ++++-- storage/az/az.go | 184 ++++++++++++++++++++++++++++++++++++++ storage/error_handling.go | 5 ++ storage/storage.go | 1 + 7 files changed, 233 insertions(+), 15 deletions(-) create mode 100644 storage/az/az.go diff --git a/cli/cli.go b/cli/cli.go index b164801..f81f5f3 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -45,19 +45,19 @@ type args struct { // Source config Source string `arg:"positional"` SourceNoSign bool `arg:"--sn" help:"Don't sign request to source AWS for anonymous access"` - SourceKey string `arg:"--sk" help:"Source AWS key / Swift User"` - SourceSecret string `arg:"--ss" help:"Source AWS secret / Swift Key"` + SourceKey string `arg:"--sk" help:"Source AWS key / Swift User / AZ storage account"` + SourceSecret string `arg:"--ss" help:"Source AWS secret / Swift Key / AZ account key"` SourceToken string `arg:"--st" help:"Source AWS token / Swift Tenant"` SourceRegion string `arg:"--sr" help:"Source AWS Region / Swift Domain"` - SourceEndpoint string `arg:"--se" help:"Source AWS Endpoint / Swift Auth URL"` + SourceEndpoint string `arg:"--se" help:"Source AWS Endpoint / Swift Auth URL / AZ Endpoint"` // Target config Target string `arg:"positional"` TargetNoSign bool `arg:"--tn" help:"Don't sign request to target AWS for anonymous access"` - TargetKey string `arg:"--tk" help:"Target AWS key"` - TargetSecret string `arg:"--ts" help:"Target AWS secret"` + TargetKey string `arg:"--tk" help:"Target AWS key / AZ storage account"` + TargetSecret string `arg:"--ts" help:"Target AWS secret / AZ account key"` TargetToken string `arg:"--tt" help:"Target AWS session token"` TargetRegion string `arg:"--tr" help:"Target AWS Region"` - TargetEndpoint string `arg:"--te" help:"Target AWS Endpoint"` + TargetEndpoint string `arg:"--te" help:"Target AWS Endpoint / AZ Endpoint"` // S3 config S3Retry uint `arg:"--s3-retry" help:"Max numbers of retries to sync file"` S3RetryInterval uint `arg:"--s3-retry-sleep" help:"Sleep interval (sec) between sync retries on error"` @@ -224,6 +224,10 @@ func parseConn(cStr string) (conn connect, err error) { case "fs": conn.Type = storage.TypeFS conn.Path = strings.TrimPrefix(cStr, "fs://") + case "az": + conn.Type = storage.TypeAzBlob + conn.Bucket = u.Host + conn.Path = strings.TrimPrefix(u.Path, "/") default: conn.Type = storage.TypeFS conn.Path = cStr diff --git a/cli/setup.go b/cli/setup.go index 4adab8f..21f26b1 100644 --- a/cli/setup.go +++ b/cli/setup.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/larrabee/s3sync/storage/az" "os" "github.com/larrabee/s3sync/pipeline" @@ -53,7 +54,9 @@ func setupStorages(ctx context.Context, syncGroup *pipeline.Group, cli *argsPars return err } - // func NewStorage(user, key, tenant, domain, authUrl string, bucketName, prefix string, skipSSLVerify bool) (*Storage, error) { + case storage.TypeAzBlob: + targetStorage = az.NewAzStorage(cli.TargetNoSign, cli.TargetKey, cli.TargetSecret, cli.TargetToken, cli.TargetRegion, cli.TargetEndpoint, + cli.Target.Bucket, cli.Target.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval, cli.SkipSSLVerify) } if sourceStorage == nil { diff --git a/go.mod b/go.mod index 913bba1..a7960b0 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/larrabee/s3sync go 1.18 require ( + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 github.com/alexflint/go-arg v1.4.2 github.com/aws/aws-sdk-go v1.44.166 github.com/gophercloud/gophercloud v1.1.1 @@ -10,13 +11,16 @@ require ( github.com/karrick/godirwalk v1.16.1 github.com/larrabee/ratelimit v1.0.4 github.com/mattn/go-isatty v0.0.12 - github.com/ncw/swift/v2 v2.0.1 github.com/pkg/xattr v0.4.2 github.com/sirupsen/logrus v1.8.1 ) require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect github.com/alexflint/go-scalar v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - golang.org/x/sys v0.1.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect ) diff --git a/go.sum b/go.sum index 9da305b..b53f853 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,12 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.0 h1:8kDqDngH+DmVBiCtIjCFTGa7MBnsIOkF9IccInFEbjk= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.0/go.mod h1:bjGvMhVMb+EEm3VRNQawDMUyMMjo+S5ewNjflkep/0Q= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 h1:vcYCAze6p19qBW7MhZybIsqD8sMV8js0NyQM8JDnVtg= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0 h1:Ma67P/GGprNwsslzEH6+Kb8nybI8jpDTm4Wmzu2ReK8= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 h1:nVocQV40OQne5613EeLayJiRAJuKlBGy+m22qWG+WRg= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0/go.mod h1:7QJP7dr2wznCMeqIrhMgWGf7XpAQnVrJqDm9nvV3Cu4= +github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 h1:OBhqkivkhkMqLPymWEppkm7vgPQY2XsHoEkaMQ0AdZY= github.com/alexflint/go-arg v1.4.2 h1:lDWZAXxpAnZUq4qwb86p/3rIJJ2Li81EoMbTMujhVa0= github.com/alexflint/go-arg v1.4.2/go.mod h1:9iRbDxne7LcR/GSvEr7ma++GLpdIU1zrghf2y2768kM= github.com/alexflint/go-scalar v1.0.0/go.mod h1:GpHzbCOZXEKMEcygYQ5n/aa4Aq84zbxjy3MxYW0gjYw= @@ -9,6 +18,9 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= +github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/gophercloud/gophercloud v1.1.1 h1:MuGyqbSxiuVBqkPZ3+Nhbytk1xZxhmfCB2Rg1cJWFWM= github.com/gophercloud/gophercloud v1.1.1/go.mod h1:aAVqcocTSXh2vYFZ1JTvx4EQmfgzxRcNupUfxZbBNDM= github.com/gosuri/uilive v0.0.3 h1:kvo6aB3pez9Wbudij8srWo4iY6SFTTxTKOkb+uRCE8I= @@ -25,12 +37,12 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/larrabee/ratelimit v1.0.4 h1:cVhspgo8QSRMb76pIWwqSE19ABxRhH4j8kbe35Csw4I= github.com/larrabee/ratelimit v1.0.4/go.mod h1:jlhboGLs+oa8LIfN4shmd1ONRSkGcwcGqYndDwdpSsc= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/ncw/swift/v2 v2.0.1 h1:q1IN8hNViXEv8Zvg3Xdis4a3c4IlIGezkYz09zQL5J0= -github.com/ncw/swift/v2 v2.0.1/go.mod h1:z0A9RVdYPjNjXVo2pDOPxZ4eu3oarO1P91fTItcb+Kg= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/xattr v0.4.2 h1:fbVxr9lvkToTGgPljVszvFsOdcbSv5BmGABneyxRgZM= github.com/pkg/xattr v0.4.2/go.mod h1:sBD3RAqlr8Q+RC3FutZcikpT8nyDrIEEBw2J744gVWs= @@ -39,19 +51,21 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -63,8 +77,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -72,8 +87,9 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= @@ -84,3 +100,4 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/storage/az/az.go b/storage/az/az.go new file mode 100644 index 0000000..c615601 --- /dev/null +++ b/storage/az/az.go @@ -0,0 +1,184 @@ +package az + +import ( + "bytes" + "context" + "errors" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/larrabee/s3sync/storage" + "io" + "time" + + "github.com/larrabee/ratelimit" +) +import ( + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" +) + +// S3Storage configuration. +type AzStorage struct { + client *azblob.Client + awsBucket *string + prefix string + keysPerReq int64 + retryCnt uint + retryInterval time.Duration + ctx context.Context + listMarker *string + rlBucket ratelimit.Bucket +} + +// NewAzStorage return new configured AZ storage. +// +// You should always create new storage with this constructor. +func NewAzStorage(awsNoSign bool, awsAccessKey, awsSecretKey, awsToken, awsRegion, endpoint, bucketName, prefix string, keysPerReq int64, retryCnt uint, retryDelay time.Duration, skipSSLVerify bool) *AzStorage { + credential, err := azblob.NewSharedKeyCredential(awsAccessKey, awsSecretKey) + cl, err := azblob.NewClientWithSharedKeyCredential(endpoint, credential, nil) + + if err != nil { + return nil + } + st := AzStorage{ + awsBucket: &bucketName, + client: cl, + prefix: prefix, + keysPerReq: keysPerReq, + retryCnt: retryCnt, + retryInterval: retryDelay, + ctx: context.TODO(), + rlBucket: ratelimit.NewFakeBucket(), + } + + return &st +} + +// WithContext add's context to storage. +func (st *AzStorage) WithContext(ctx context.Context) { + st.ctx = ctx +} + +// WithRateLimit set rate limit (bytes/sec) for storage. +func (st *AzStorage) WithRateLimit(limit int) error { + bucket, err := ratelimit.NewBucketWithRate(float64(limit), int64(limit)) + if err != nil { + return err + } + st.rlBucket = bucket + return nil +} + +// List AZ container and send founded objects to chan. +func (st *AzStorage) List(output chan<- *storage.Object) error { + flatPager := st.client.NewListBlobsFlatPager(*st.awsBucket, nil) + for flatPager.More() { + page, err := flatPager.NextPage(st.ctx) + if err != nil { + return err + } + for _, obj := range page.Segment.BlobItems { + output <- &storage.Object{ + Key: obj.Name, + ETag: storage.StrongEtag(obj.Metadata["etag"]), + Mtime: obj.Properties.LastModified, + StorageClass: (*string)(obj.Properties.AccessTier), + IsLatest: aws.Bool(true), + } + } + } + storage.Log.Debugf("Listing bucket finished") + return nil +} + +// PutObject saves object to AZ. +// PutObject ignore VersionId, it always save object as latest version. +func (st *AzStorage) PutObject(obj *storage.Object) error { + var objReader io.ReadSeeker + if obj.Content == nil { + if obj.ContentStream == nil { + return errors.New("object has no content") + } + buf := bytes.NewBuffer(make([]byte, 0, aws.Int64Value(obj.ContentLength))) + if _, err := io.Copy(ratelimit.NewWriter(buf, st.rlBucket), obj.ContentStream); err != nil { + return err + } + obj.ContentStream.Close() + objReader = bytes.NewReader(buf.Bytes()) + } else { + objReader = bytes.NewReader(*obj.Content) + } + + rlReader := ratelimit.NewReadSeeker(objReader, st.rlBucket) + options := azblob.UploadStreamOptions{ + BlockSize: 0, + Concurrency: 0, + TransactionalValidation: nil, + HTTPHeaders: nil, + Metadata: nil, + AccessConditions: nil, + AccessTier: nil, + Tags: nil, + CPKInfo: nil, + CPKScopeInfo: nil, + } + _, err := st.client.UploadStream(st.ctx, *st.awsBucket, st.prefix+*obj.Key, rlReader, &options) + return err +} + +func withAcceptEncoding(e string) request.Option { + return func(r *request.Request) { + r.HTTPRequest.Header.Add("Accept-Encoding", e) + } +} + +// GetObjectContent read object content and metadata from AZ Blob. +func (st *AzStorage) GetObjectContent(obj *storage.Object) error { + options := azblob.DownloadStreamOptions{} + stream, err := st.client.DownloadStream(st.ctx, *st.awsBucket, *obj.Key, &options) + if err != nil { + return err + } + + buf := bytes.NewBuffer(make([]byte, 0, aws.Int64Value(stream.ContentLength))) + if _, err := io.Copy(ratelimit.NewWriter(buf, st.rlBucket), stream.Body); err != nil { + return err + } + + data := buf.Bytes() + obj.Content = &data + obj.ContentType = stream.ContentType + obj.ContentLength = stream.ContentLength + obj.ContentDisposition = stream.ContentDisposition + obj.ContentEncoding = stream.ContentEncoding + obj.ContentLanguage = stream.ContentLanguage + obj.Metadata = stream.Metadata + obj.Mtime = stream.LastModified + obj.CacheControl = stream.CacheControl + + return nil +} + +// GetObjectACL read object ACL from AZ Blob. +func (st *AzStorage) GetObjectACL(obj *storage.Object) error { + //TODO implement + + return nil +} + +// GetObjectMeta update object metadata from AZ Blob. +func (st *AzStorage) GetObjectMeta(obj *storage.Object) error { + options := blob.GetPropertiesOptions{} + properties, err := st.client.ServiceClient().NewContainerClient(*st.awsBucket).NewBlobClient(st.prefix+*obj.Key).GetProperties(st.ctx, &options) + obj.ContentType = properties.ContentType + if err != nil { + return err + } + return nil +} + +// DeleteObject remove object from AZ Blob. +func (st *AzStorage) DeleteObject(obj *storage.Object) error { + //TODO Implement + return nil +} diff --git a/storage/error_handling.go b/storage/error_handling.go index cd9cf49..93e41f1 100644 --- a/storage/error_handling.go +++ b/storage/error_handling.go @@ -3,6 +3,7 @@ package storage import ( "context" "errors" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" "os" "github.com/aws/aws-sdk-go/aws/awserr" @@ -27,6 +28,10 @@ func IsErrNotExist(err error) bool { if errors.As(err, &sErr) { return true } + var azErr *azcore.ResponseError + if errors.As(err, &azErr) { + return azErr.StatusCode >= 400 && azErr.StatusCode < 410 + } return false } diff --git a/storage/storage.go b/storage/storage.go index 80cf6d1..fcfb0fb 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -23,6 +23,7 @@ const ( TypeFS TypeS3Stream TypeSwift + TypeAzBlob ) // Object contain content and metadata of S3 object. From 4e6a823093154371e32df9f22ada74b5a300dbfe Mon Sep 17 00:00:00 2001 From: Justin Heesemann Date: Wed, 9 Aug 2023 15:06:57 +0200 Subject: [PATCH 2/2] Fix memory issue when using s3s as a source. No need to copy reader to buffer, when we can stream the reader directly --- go.mod | 2 +- storage/az/az.go | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index a7960b0..f40e256 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/larrabee/s3sync go 1.18 require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 github.com/alexflint/go-arg v1.4.2 github.com/aws/aws-sdk-go v1.44.166 @@ -16,7 +17,6 @@ require ( ) require ( - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect github.com/alexflint/go-scalar v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/storage/az/az.go b/storage/az/az.go index c615601..434bde1 100644 --- a/storage/az/az.go +++ b/storage/az/az.go @@ -94,22 +94,22 @@ func (st *AzStorage) List(output chan<- *storage.Object) error { // PutObject saves object to AZ. // PutObject ignore VersionId, it always save object as latest version. func (st *AzStorage) PutObject(obj *storage.Object) error { - var objReader io.ReadSeeker + var objReader io.Reader if obj.Content == nil { if obj.ContentStream == nil { return errors.New("object has no content") } - buf := bytes.NewBuffer(make([]byte, 0, aws.Int64Value(obj.ContentLength))) - if _, err := io.Copy(ratelimit.NewWriter(buf, st.rlBucket), obj.ContentStream); err != nil { - return err - } - obj.ContentStream.Close() - objReader = bytes.NewReader(buf.Bytes()) + defer func(ContentStream io.ReadCloser) { + err := ContentStream.Close() + if err != nil { + storage.Log.Warnf("failed to close input stream: %v", err) + } + }(obj.ContentStream) + objReader = obj.ContentStream } else { objReader = bytes.NewReader(*obj.Content) } - - rlReader := ratelimit.NewReadSeeker(objReader, st.rlBucket) + rlReader := ratelimit.NewReader(objReader, st.rlBucket) options := azblob.UploadStreamOptions{ BlockSize: 0, Concurrency: 0,