Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
urykhy committed Oct 29, 2022
1 parent 33fd38e commit 1373c66
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 70 deletions.
16 changes: 3 additions & 13 deletions aes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ import (
"fmt"
)

const (
// in FileWriter we use chunks upto aesChunkSize bytes to encrypt data
aesChunkSize = 1024 * 1024
)

// calculateIV `shifts` IV to given offset
// based on calculateIV from AesCtrCryptoCodec.java
func calculateIV(offset int64, initIV []byte) ([]byte, error) {
Expand All @@ -37,10 +32,8 @@ func calculateIV(offset int64, initIV []byte) ([]byte, error) {
return iv, nil
}

// aesCtrStep perform AES-CTR XOR operation on given byte string.
// Once encryption and decryption are exactly the same operation for CTR mode,
// this function can be used to perform both.
func aesCtrStep(offset int64, enc *transparentEncryptionInfo, b []byte) ([]byte, error) {
// aesCreateCTRStream create stream to encrypt/decrypt data from specific offset
func aesCreateCTRStream(offset int64, enc *transparentEncryptionInfo) (cipher.Stream, error) {
iv, err := calculateIV(offset, enc.iv)
if err != nil {
return nil, err
Expand All @@ -61,8 +54,5 @@ func aesCtrStep(offset int64, enc *transparentEncryptionInfo, b []byte) ([]byte,
tmp := make([]byte, padding)
stream.XORKeyStream(tmp, tmp)
}

text := make([]byte, len(b))
stream.XORKeyStream(text, b)
return text, nil
return stream, nil
}
23 changes: 21 additions & 2 deletions aes_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
package hdfs

import (
"bytes"
"crypto/cipher"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAesChunks(t *testing.T) {
// aesCtrRead perform AES-CTR XOR operation on given byte string.
// Once encryption and decryption are exactly the same operation for CTR mode,
// this function can be used to perform both.
func aesCtrStep(offset int64, enc *transparentEncryptionInfo, b []byte) ([]byte, error) {
stream, err := aesCreateCTRStream(offset, enc)
if err != nil {
return nil, err
}

r := make([]byte, len(b))
_, err = cipher.StreamReader{S: stream, R: bytes.NewReader(b)}.Read(r)
if err != nil {
return nil, err
}
return r, nil
}

func TestAesIV(t *testing.T) {
originalText := []byte("some random plain text, nice to have it quite long")
key := []byte("0123456789abcdef")

Expand Down Expand Up @@ -46,5 +65,5 @@ func TestAesChunks(t *testing.T) {
decryptedByChunks = append(decryptedByChunks, tmp...)
pos += int64(x)
}
assert.Equal(t, decryptedByChunks, originalText)
assert.Equal(t, decryptedByChunks, decryptedText)
}
42 changes: 22 additions & 20 deletions file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type transparentEncryptionInfo struct {
key []byte
iv []byte
cipher cipher.Block
stream cipher.Stream
}

// Open returns an FileReader which can be used for reading.
Expand Down Expand Up @@ -184,6 +185,12 @@ func (f *FileReader) Seek(offset int64, whence int) (int64, error) {
if f.offset != off {
f.offset = off

// To make things simpler, we just destroy cipher.Stream (if any)
// It will be recreated in Read()
if f.enc != nil {
f.enc.stream = nil
}

if f.blockReader != nil {
// If the seek is within the next few chunks, it's much more
// efficient to throw away a few bytes than to reconnect and start
Expand All @@ -209,25 +216,6 @@ func (f *FileReader) Read(b []byte) (int, error) {
return 0, io.ErrClosedPipe
}

offset := f.offset
n, err := f.readImpl(b)

// Decrypt data chunk if file from HDFS encrypted zone.
if f.enc != nil && n > 0 {
plaintext, err := aesCtrStep(offset, f.enc, b[:n])
if err != nil {
f.offset = offset
return 0, err
}
for i := 0; i < n; i++ {
b[i] = plaintext[i]
}
}

return n, err
}

func (f *FileReader) readImpl(b []byte) (int, error) {
if f.info.IsDir() {
return 0, &os.PathError{
"read",
Expand Down Expand Up @@ -259,7 +247,21 @@ func (f *FileReader) readImpl(b []byte) (int, error) {
}
}

n, err := f.blockReader.Read(b)
var n int
var err error

if f.enc != nil {
if f.enc.stream == nil {
f.enc.stream, err = aesCreateCTRStream(f.offset, f.enc)
if err != nil {
return 0, err
}
}
n, err = cipher.StreamReader{S: f.enc.stream, R: f.blockReader}.Read(b)
} else {
n, err = f.blockReader.Read(b)
}

f.offset += int64(n)

if err != nil && err != io.EOF {
Expand Down
50 changes: 20 additions & 30 deletions file_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hdfs

import (
"crypto/cipher"
"errors"
"os"
"time"
Expand Down Expand Up @@ -204,28 +205,6 @@ func (f *FileWriter) SetDeadline(t time.Time) error {
// of this, it is important that Close is called after all data has been
// written.
func (f *FileWriter) Write(b []byte) (int, error) {
// Encrypt data chunk if file in HDFS encrypted zone.
if f.enc != nil && len(b) > 0 {
var offset int
for offset < len(b) {
size := min(len(b)-offset, aesChunkSize)
ciphertext, err := aesCtrStep(f.offset, f.enc, b[offset:offset+size])
if err != nil {
return offset, err
}
writtenSize, err := f.writeImpl(ciphertext)
offset += writtenSize
if err != nil {
return offset, err
}
}
return offset, nil
} else {
return f.writeImpl(b)
}
}

func (f *FileWriter) writeImpl(b []byte) (int, error) {
if f.blockWriter == nil {
err := f.startNewBlock()
if err != nil {
Expand All @@ -235,7 +214,25 @@ func (f *FileWriter) writeImpl(b []byte) (int, error) {

off := 0
for off < len(b) {
n, err := f.blockWriter.Write(b[off:])
var n int
var err error

if f.enc != nil {
if f.enc.stream == nil {
f.enc.stream, err = aesCreateCTRStream(f.offset, f.enc)
if err != nil {
return 0, err
}
}
n, err = cipher.StreamWriter{S: f.enc.stream, W: f.blockWriter}.Write(b[off:])
// If blockWriter writes less than expected bytes,
// we must recreate stream chipher, since it's internal counter goes forward.
if n != len(b[off:]) {
f.enc.stream = nil
}
} else {
n, err = f.blockWriter.Write(b[off:])
}
off += n
f.offset += int64(n)
if err == transfer.ErrEndOfBlock {
Expand Down Expand Up @@ -369,10 +366,3 @@ func (f *FileWriter) finalizeBlock() error {
f.blockWriter = nil
return nil
}

func min(a, b int) int {
if a < b {
return a
}
return b
}
52 changes: 47 additions & 5 deletions file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -560,6 +561,10 @@ func TestEncryptedZoneWriteChunks(t *testing.T) {
bytes, err := ioutil.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, originalText, bytes)

hdfsOut, err := exec.Command("hadoop", "dfs", "-cat", "/_test/kms/write_chunks.txt").Output()
require.NoError(t, err)
assert.Equal(t, originalText, hdfsOut)
}

func TestEncryptedZoneAppendChunks(t *testing.T) {
Expand All @@ -586,28 +591,65 @@ func TestEncryptedZoneAppendChunks(t *testing.T) {
bytes, err := ioutil.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, originalText, bytes)

hdfsOut, err := exec.Command("hadoop", "dfs", "-cat", "/_test/kms/append_chunks.txt").Output()
require.NoError(t, err)
assert.Equal(t, originalText, hdfsOut)
}

func TestEncryptedZoneLargeBlock(t *testing.T) {
skipWithoutEncryptedZone(t)

// Generate quite large (aesChunkSize * 1.5 bytes) block, so we can trigger encryption in chunks.
str := "some random text"
originalText := []byte(strings.Repeat(str, aesChunkSize*1.5/len(str)))
// Generate quite large data block, so we can trigger encryption in chunks.
mobydick, err := os.Open("testdata/mobydick.txt")
require.NoError(t, err)
originalText, err := ioutil.ReadAll(mobydick)
require.NoError(t, err)
client := getClient(t)

// Create file with small (128Kb) block size, so encrypted chunk will be placed over multiple hdfs blocks.
writer, err := client.CreateFile("/_test/kms/large_write.txt", 1, 131072, 0755)
writer, err := client.CreateFile("/_test/kms/mobydick.unittest", 1, 131072, 0755)
require.NoError(t, err)

_, err = writer.Write(originalText)
require.NoError(t, err)
assertClose(t, writer)

reader, err := client.Open("/_test/kms/large_write.txt")
reader, err := client.Open("/_test/kms/mobydick.unittest")
require.NoError(t, err)
bytes, err := ioutil.ReadAll(reader)
require.NoError(t, err)
assert.Equal(t, originalText, bytes)

// Ensure read after seek works as expected:
_, err = reader.Seek(35657, io.SeekStart)
require.NoError(t, err)
bytes = make([]byte, 64)
_, err = reader.Read(bytes)
require.NoError(t, err)
assert.Equal(t, []byte("By reason of these things, then, the whaling voyage was welcome;"), bytes)

hdfsOut, err := exec.Command("hadoop", "dfs", "-cat", "/_test/kms/mobydick.unittest").Output()
require.NoError(t, err)
assert.Equal(t, originalText, hdfsOut)
}

func TestEncryptedZoneReadAfterJava(t *testing.T) {
skipWithoutEncryptedZone(t)

err := exec.Command("hadoop", "dfs", "-copyFromLocal", "testdata/mobydick.txt", "/_test/kms/mobydick.java").Run()
require.NoError(t, err)

mobydick, err := os.Open("testdata/mobydick.txt")
require.NoError(t, err)
originalText, err := ioutil.ReadAll(mobydick)
require.NoError(t, err)

client := getClient(t)
reader, err := client.Open("/_test/kms/mobydick.java")
require.NoError(t, err)
bytes, err := ioutil.ReadAll(reader)
require.NoError(t, err)

assert.Equal(t, originalText, bytes)
}

0 comments on commit 1373c66

Please sign in to comment.