Skip to content

Commit

Permalink
measure transfer speed when copying file
Browse files Browse the repository at this point in the history
  • Loading branch information
leafo committed Nov 22, 2023
1 parent 056873e commit 0f55aab
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
29 changes: 25 additions & 4 deletions zipserver/copy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"

Check failure on line 7 in zipserver/copy_handler.go

View workflow job for this annotation

GitHub Actions / build

imported and not used: "io/ioutil"
"log"
"net/http"
"net/url"
Expand All @@ -12,6 +13,19 @@ import (

var copyLockTable = NewLockTable()

func formatBytes(b float64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%.2f B", b)
}
div, exp := float64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.2f %cB", b/div, "kMGTPE"[exp])
}

// notify the callback URL of task completion
func notifyCallback(callbackURL string, resValues url.Values) error {
notifyCtx, notifyCancel := context.WithTimeout(context.Background(), time.Duration(config.AsyncNotificationTimeout))
Expand Down Expand Up @@ -89,27 +103,34 @@ func copyHandler(w http.ResponseWriter, r *http.Request) error {
defer reader.Close()

if err != nil {
log.Print("Failed to get file", err)
log.Print("Failed to get file: ", err)
notifyError(callbackURL, err)
return
}

mReader := newMeasuredReader(reader)

// transfer the reader to s3
// TODO: get the actual mime type from the GetFile request
log.Print("Starting transfer: ", key)
err = targetStorage.PutFile(jobCtx, config.S3Bucket, key, reader, "application/octet-stream")
err = targetStorage.PutFile(jobCtx, config.S3Bucket, key, mReader, "application/octet-stream")

if err != nil {
log.Print("Failed to copy file: ", err)
notifyError(callbackURL, err)
return
}

log.Print("Transfer complete " + callbackURL)
log.Print("Transfer complete: ", key,
", bytes read: ", formatBytes(float64(mReader.BytesRead)),
", duration: ", mReader.Duration.Seconds(),
", speed: ", formatBytes(mReader.TransferSpeed()), "/s")

resValues := url.Values{}
resValues.Add("Success", "true")
resValues.Add("Key", key)
resValues.Add("Duration", fmt.Sprintf("%f", time.Since(startTime).Seconds()))
resValues.Add("Duration", fmt.Sprintf("%.4fs", time.Since(startTime).Seconds()))
resValues.Add("Size", fmt.Sprintf("%d", mReader.BytesRead))

notifyCallback(callbackURL, resValues)
})()
Expand Down
32 changes: 32 additions & 0 deletions zipserver/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"log"
"time"
)

type readerClosure func(p []byte) (int, error)
Expand Down Expand Up @@ -35,3 +36,34 @@ func limitedReader(reader io.Reader, maxBytes uint64, totalBytes *uint64) reader
return bytesRead, err
}
}

type measuredReader struct {
reader io.Reader // The underlying reader
BytesRead int64 // Total bytes read
StartTime time.Time // Time when reading started
Duration time.Duration // Duration of the read operation
}

func newMeasuredReader(r io.Reader) *measuredReader {
return &measuredReader{
reader: r,
StartTime: time.Now(),
}
}

// Read reads data from the underlying io.Reader, tracking the bytes read and duration
func (mr *measuredReader) Read(p []byte) (int, error) {
n, err := mr.reader.Read(p)
mr.BytesRead += int64(n)
mr.Duration = time.Since(mr.StartTime)

return n, err
}

// TransferSpeed returns the average transfer speed in bytes per second
func (mr *measuredReader) TransferSpeed() float64 {
if mr.Duration.Seconds() == 0 {
return 0
}
return float64(mr.BytesRead) / mr.Duration.Seconds()
}

0 comments on commit 0f55aab

Please sign in to comment.