diff --git a/aes.go b/aes.go index c96e9f45..5cc7bbb3 100644 --- a/aes.go +++ b/aes.go @@ -7,11 +7,6 @@ import ( "fmt" ) -const ( - // in FileWriter we use chunks upto aesChunkSize bytes to encrypt data - aesChunkSize = 1024 * 1024 -) - // calculateIV `shifts` IV to given offset // based on calculateIV from AesCtrCryptoCodec.java func calculateIV(offset int64, initIV []byte) ([]byte, error) { @@ -37,10 +32,8 @@ func calculateIV(offset int64, initIV []byte) ([]byte, error) { return iv, nil } -// aesCtrStep perform AES-CTR XOR operation on given byte string. -// Once encryption and decryption are exactly the same operation for CTR mode, -// this function can be used to perform both. -func aesCtrStep(offset int64, enc *transparentEncryptionInfo, b []byte) ([]byte, error) { +// aesCreateCTRStream create stream to encrypt/decrypt data from specific offset +func aesCreateCTRStream(offset int64, enc *transparentEncryptionInfo) (cipher.Stream, error) { iv, err := calculateIV(offset, enc.iv) if err != nil { return nil, err @@ -61,8 +54,5 @@ func aesCtrStep(offset int64, enc *transparentEncryptionInfo, b []byte) ([]byte, tmp := make([]byte, padding) stream.XORKeyStream(tmp, tmp) } - - text := make([]byte, len(b)) - stream.XORKeyStream(text, b) - return text, nil + return stream, nil } diff --git a/aes_test.go b/aes_test.go index df9ab672..26fe7ce7 100644 --- a/aes_test.go +++ b/aes_test.go @@ -1,12 +1,31 @@ package hdfs import ( + "bytes" + "crypto/cipher" "testing" "github.com/stretchr/testify/assert" ) -func TestAesChunks(t *testing.T) { +// aesCtrRead perform AES-CTR XOR operation on given byte string. +// Once encryption and decryption are exactly the same operation for CTR mode, +// this function can be used to perform both. +func aesCtrStep(offset int64, enc *transparentEncryptionInfo, b []byte) ([]byte, error) { + stream, err := aesCreateCTRStream(offset, enc) + if err != nil { + return nil, err + } + + r := make([]byte, len(b)) + _, err = cipher.StreamReader{S: stream, R: bytes.NewReader(b)}.Read(r) + if err != nil { + return nil, err + } + return r, nil +} + +func TestAesIV(t *testing.T) { originalText := []byte("some random plain text, nice to have it quite long") key := []byte("0123456789abcdef") @@ -46,5 +65,5 @@ func TestAesChunks(t *testing.T) { decryptedByChunks = append(decryptedByChunks, tmp...) pos += int64(x) } - assert.Equal(t, decryptedByChunks, originalText) + assert.Equal(t, decryptedByChunks, decryptedText) } diff --git a/file_reader.go b/file_reader.go index c93c87ab..bfe699d1 100644 --- a/file_reader.go +++ b/file_reader.go @@ -40,6 +40,7 @@ type transparentEncryptionInfo struct { key []byte iv []byte cipher cipher.Block + stream cipher.Stream } // Open returns an FileReader which can be used for reading. @@ -184,6 +185,12 @@ func (f *FileReader) Seek(offset int64, whence int) (int64, error) { if f.offset != off { f.offset = off + // To make things simpler, we just destroy cipher.Stream (if any) + // It will be recreated in Read() + if f.enc != nil { + f.enc.stream = nil + } + if f.blockReader != nil { // If the seek is within the next few chunks, it's much more // efficient to throw away a few bytes than to reconnect and start @@ -209,25 +216,6 @@ func (f *FileReader) Read(b []byte) (int, error) { return 0, io.ErrClosedPipe } - offset := f.offset - n, err := f.readImpl(b) - - // Decrypt data chunk if file from HDFS encrypted zone. - if f.enc != nil && n > 0 { - plaintext, err := aesCtrStep(offset, f.enc, b[:n]) - if err != nil { - f.offset = offset - return 0, err - } - for i := 0; i < n; i++ { - b[i] = plaintext[i] - } - } - - return n, err -} - -func (f *FileReader) readImpl(b []byte) (int, error) { if f.info.IsDir() { return 0, &os.PathError{ "read", @@ -259,7 +247,21 @@ func (f *FileReader) readImpl(b []byte) (int, error) { } } - n, err := f.blockReader.Read(b) + var n int + var err error + + if f.enc != nil { + if f.enc.stream == nil { + f.enc.stream, err = aesCreateCTRStream(f.offset, f.enc) + if err != nil { + return 0, err + } + } + n, err = cipher.StreamReader{S: f.enc.stream, R: f.blockReader}.Read(b) + } else { + n, err = f.blockReader.Read(b) + } + f.offset += int64(n) if err != nil && err != io.EOF { diff --git a/file_writer.go b/file_writer.go index 3fe558ce..c83b5788 100644 --- a/file_writer.go +++ b/file_writer.go @@ -1,6 +1,7 @@ package hdfs import ( + "crypto/cipher" "errors" "os" "time" @@ -204,28 +205,6 @@ func (f *FileWriter) SetDeadline(t time.Time) error { // of this, it is important that Close is called after all data has been // written. func (f *FileWriter) Write(b []byte) (int, error) { - // Encrypt data chunk if file in HDFS encrypted zone. - if f.enc != nil && len(b) > 0 { - var offset int - for offset < len(b) { - size := min(len(b)-offset, aesChunkSize) - ciphertext, err := aesCtrStep(f.offset, f.enc, b[offset:offset+size]) - if err != nil { - return offset, err - } - writtenSize, err := f.writeImpl(ciphertext) - offset += writtenSize - if err != nil { - return offset, err - } - } - return offset, nil - } else { - return f.writeImpl(b) - } -} - -func (f *FileWriter) writeImpl(b []byte) (int, error) { if f.blockWriter == nil { err := f.startNewBlock() if err != nil { @@ -235,7 +214,25 @@ func (f *FileWriter) writeImpl(b []byte) (int, error) { off := 0 for off < len(b) { - n, err := f.blockWriter.Write(b[off:]) + var n int + var err error + + if f.enc != nil { + if f.enc.stream == nil { + f.enc.stream, err = aesCreateCTRStream(f.offset, f.enc) + if err != nil { + return 0, err + } + } + n, err = cipher.StreamWriter{S: f.enc.stream, W: f.blockWriter}.Write(b[off:]) + // If blockWriter writes less than expected bytes, + // we must recreate stream chipher, since it's internal counter goes forward. + if n != len(b[off:]) { + f.enc.stream = nil + } + } else { + n, err = f.blockWriter.Write(b[off:]) + } off += n f.offset += int64(n) if err == transfer.ErrEndOfBlock { @@ -369,10 +366,3 @@ func (f *FileWriter) finalizeBlock() error { f.blockWriter = nil return nil } - -func min(a, b int) int { - if a < b { - return a - } - return b -} diff --git a/file_writer_test.go b/file_writer_test.go index 6d30d9cb..fbc2d02e 100644 --- a/file_writer_test.go +++ b/file_writer_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "math/rand" "os" + "os/exec" "path/filepath" "strings" "testing" @@ -560,6 +561,10 @@ func TestEncryptedZoneWriteChunks(t *testing.T) { bytes, err := ioutil.ReadAll(reader) require.NoError(t, err) assert.Equal(t, originalText, bytes) + + hdfsOut, err := exec.Command("hadoop", "dfs", "-cat", "/_test/kms/write_chunks.txt").Output() + require.NoError(t, err) + assert.Equal(t, originalText, hdfsOut) } func TestEncryptedZoneAppendChunks(t *testing.T) { @@ -586,28 +591,65 @@ func TestEncryptedZoneAppendChunks(t *testing.T) { bytes, err := ioutil.ReadAll(reader) require.NoError(t, err) assert.Equal(t, originalText, bytes) + + hdfsOut, err := exec.Command("hadoop", "dfs", "-cat", "/_test/kms/append_chunks.txt").Output() + require.NoError(t, err) + assert.Equal(t, originalText, hdfsOut) } func TestEncryptedZoneLargeBlock(t *testing.T) { skipWithoutEncryptedZone(t) - // Generate quite large (aesChunkSize * 1.5 bytes) block, so we can trigger encryption in chunks. - str := "some random text" - originalText := []byte(strings.Repeat(str, aesChunkSize*1.5/len(str))) + // Generate quite large data block, so we can trigger encryption in chunks. + mobydick, err := os.Open("testdata/mobydick.txt") + require.NoError(t, err) + originalText, err := ioutil.ReadAll(mobydick) + require.NoError(t, err) client := getClient(t) // Create file with small (128Kb) block size, so encrypted chunk will be placed over multiple hdfs blocks. - writer, err := client.CreateFile("/_test/kms/large_write.txt", 1, 131072, 0755) + writer, err := client.CreateFile("/_test/kms/mobydick.unittest", 1, 131072, 0755) require.NoError(t, err) _, err = writer.Write(originalText) require.NoError(t, err) assertClose(t, writer) - reader, err := client.Open("/_test/kms/large_write.txt") + reader, err := client.Open("/_test/kms/mobydick.unittest") require.NoError(t, err) + bytes, err := ioutil.ReadAll(reader) + require.NoError(t, err) + assert.Equal(t, originalText, bytes) + // Ensure read after seek works as expected: + _, err = reader.Seek(35657, io.SeekStart) + require.NoError(t, err) + bytes = make([]byte, 64) + _, err = reader.Read(bytes) + require.NoError(t, err) + assert.Equal(t, []byte("By reason of these things, then, the whaling voyage was welcome;"), bytes) + + hdfsOut, err := exec.Command("hadoop", "dfs", "-cat", "/_test/kms/mobydick.unittest").Output() + require.NoError(t, err) + assert.Equal(t, originalText, hdfsOut) +} + +func TestEncryptedZoneReadAfterJava(t *testing.T) { + skipWithoutEncryptedZone(t) + + err := exec.Command("hadoop", "dfs", "-copyFromLocal", "testdata/mobydick.txt", "/_test/kms/mobydick.java").Run() + require.NoError(t, err) + + mobydick, err := os.Open("testdata/mobydick.txt") + require.NoError(t, err) + originalText, err := ioutil.ReadAll(mobydick) + require.NoError(t, err) + + client := getClient(t) + reader, err := client.Open("/_test/kms/mobydick.java") + require.NoError(t, err) bytes, err := ioutil.ReadAll(reader) require.NoError(t, err) + assert.Equal(t, originalText, bytes) }