Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
} else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutRetention") {
eventsInfo[i] = EventInfo{
Expand All @@ -750,6 +751,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
} else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutLegalHold") {
eventsInfo[i] = EventInfo{
Expand All @@ -761,6 +763,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
} else {
eventsInfo[i] = EventInfo{
Expand All @@ -772,6 +775,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
}
} else {
Expand All @@ -784,6 +788,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
}
}
Expand Down Expand Up @@ -1087,6 +1092,7 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre
}

opts := minio.PutObjectOptions{
Internal: minio.AdvancedPutOptions{SourceVersionID: putOpts.versionID},
UserMetadata: metadata,
UserTags: tagsMap,
Progress: progress,
Expand Down
1 change: 1 addition & 0 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type PutOptions struct {
concurrentStream bool
ifNotExists bool
checksum minio.ChecksumType
versionID string
}

// StatOptions holds options of the HEAD operation
Expand Down
1 change: 1 addition & 0 deletions cmd/common-methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge
multipartThreads: uint(multipartThreads),
ifNotExists: uploadOpts.ifNotExists,
checksum: uploadOpts.urls.checksum,
versionID: uploadOpts.urls.TargetContent.VersionID,
}

if isReadAt(reader) || length == 0 {
Expand Down
41 changes: 36 additions & 5 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ var (
Name: "disable-multipart",
Usage: "disable multipart upload feature",
},
cli.BoolFlag{
Name: "with-versioning",
Usage: "upload or remove with versioning enabled on a bucket",
},
cli.StringSliceFlag{
Name: "exclude",
Usage: "exclude object(s) that match specified object name pattern",
Expand Down Expand Up @@ -223,6 +227,9 @@ EXAMPLES:
16. Cross mirror between sites in a active-active deployment.
Site-A: {{.Prompt}} {{.HelpName}} --active-active siteA siteB
Site-B: {{.Prompt}} {{.HelpName}} --active-active siteB siteA

17. Mirror a bucket from MinIO cloud storage to other minIO cloud storage with versioning enabled.
{{.Prompt}} {{.HelpName}} --with-versioning myminio/bucket1 otherminio/bucket2
`,
}

Expand Down Expand Up @@ -372,7 +379,18 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo)
if mj.opts.isFake {
return sURLs.WithError(nil)
}

// remote S3 to local file system
if sURLs.SourceContent != nil && sURLs.TargetAlias == "" {
// Construct proper path with alias.
sourceWithAlias := filepath.Join(sURLs.SourceAlias, sURLs.SourceContent.URL.Path)
sourceCient, pErr := newClient(sourceWithAlias)
if pErr != nil {
return sURLs.WithError(pErr)
}
if _, err := sourceCient.Stat(ctx, StatOptions{headOnly: true}); err == nil {
return sURLs.WithError(nil)
}
}
// Construct proper path with alias.
targetWithAlias := filepath.Join(sURLs.TargetAlias, sURLs.TargetContent.URL.Path)
clnt, pErr := newClient(targetWithAlias)
Expand All @@ -385,7 +403,7 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo)
clnt.AddUserAgent(uaMirrorAppName, ReleaseTag)
}
contentCh := make(chan *ClientContent, 1)
contentCh <- &ClientContent{URL: *newClientURL(sURLs.TargetContent.URL.Path)}
contentCh <- &ClientContent{URL: *newClientURL(sURLs.TargetContent.URL.Path), VersionID: sURLs.TargetContent.VersionID}
close(contentCh)
isRemoveBucket := false
resultCh := clnt.Remove(ctx, false, isRemoveBucket, false, false, contentCh)
Expand Down Expand Up @@ -697,6 +715,10 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)

if strings.HasPrefix(string(event.Type), "s3:ObjectCreated:") {
sourceModTime, _ := time.Parse(time.RFC3339Nano, event.Time)
targetContent := &ClientContent{URL: *targetURL}
if mj.opts.enableVersion {
targetContent = &ClientContent{URL: *targetURL, VersionID: event.VersionID}
}
mirrorURL := URLs{
SourceAlias: sourceAlias,
SourceContent: &ClientContent{
Expand All @@ -708,7 +730,7 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)
Metadata: event.UserMetadata,
},
TargetAlias: targetAlias,
TargetContent: &ClientContent{URL: *targetURL},
TargetContent: targetContent,
MD5: mj.opts.md5,
checksum: mj.opts.checksum,
DisableMultipart: mj.opts.disableMultipart,
Expand All @@ -734,11 +756,15 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)
// Ignore delete cascading delete events if cyclical.
continue
}
targetContent := &ClientContent{URL: *targetURL}
if mj.opts.enableVersion {
targetContent = &ClientContent{URL: *targetURL, VersionID: event.VersionID}
}
mirrorURL := URLs{
SourceAlias: sourceAlias,
SourceContent: nil,
SourceContent: &ClientContent{URL: *sourceURL},
TargetAlias: targetAlias,
TargetContent: &ClientContent{URL: *targetURL},
TargetContent: targetContent,
MD5: mj.opts.md5,
checksum: mj.opts.checksum,
DisableMultipart: mj.opts.disableMultipart,
Expand Down Expand Up @@ -1013,6 +1039,7 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc
md5: md5,
checksum: checksum,
disableMultipart: cli.Bool("disable-multipart"),
enableVersion: cli.Bool("with-versioning"),
skipErrors: cli.Bool("skip-errors"),
excludeOptions: cli.StringSlice("exclude"),
excludeBuckets: cli.StringSlice("exclude-bucket"),
Expand Down Expand Up @@ -1041,6 +1068,10 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc
mirrorSrcBuckets := srcClt.GetURL().Type == objectStorage && srcClt.GetURL().Path == string(srcClt.GetURL().Separator)
mirrorBucketsToBuckets := mirrorSrcBuckets && createDstBuckets

if cli.Bool("with-versioning") && (!checkIfBucketIsVersioned(ctx, srcURL) || !checkIfBucketIsVersioned(ctx, dstURL)) {
fatalIf(errInvalidArgument().Trace(cli.Command.Name), "You cannot specify --with-versioning in versioning not enabled buckets")
}

if mirrorSrcBuckets || createDstBuckets {
// Synchronize buckets using dirDifference function
for d := range bucketDifference(ctx, srcClt, dstClt, mj.opts) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/mirror-url.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ type mirrorOptions struct {
skipErrors bool
excludeOptions, excludeStorageClasses, excludeBuckets []string
encKeyDB map[string][]prefixSSEPair
md5, disableMultipart bool
md5, disableMultipart, enableVersion bool
olderThan, newerThan string
storageClass string
userMetadata map[string]string
Expand Down
1 change: 1 addition & 0 deletions cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type EventInfo struct {
Port string
UserAgent string
Type notification.EventType
VersionID string
}

// WatchOptions contains watch configuration options
Expand Down