Skip to content

Commit

Permalink
initial encrypted zone support
Browse files Browse the repository at this point in the history
  • Loading branch information
urykhy committed Mar 7, 2022
1 parent ff27ef8 commit 4d0d7ec
Show file tree
Hide file tree
Showing 9 changed files with 530 additions and 8 deletions.
68 changes: 68 additions & 0 deletions aes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package hdfs

import (
"crypto/aes"
"crypto/cipher"
"encoding/binary"
"fmt"
)

const (
// in FileWriter we use chunks upto aesChunkSize bytes to encrypt data
aesChunkSize = 1024 * 1024
)

// calculateIV `shifts` IV to given offset
// based on calculateIV from AesCtrCryptoCodec.java
func calculateIV(offset int64, initIV []byte) ([]byte, error) {
if len(initIV) != aes.BlockSize {
return nil, fmt.Errorf("calculateIV: invalid iv size: %v", len(initIV))
}

counter := offset / aes.BlockSize
iv := make([]byte, aes.BlockSize)

high := binary.BigEndian.Uint64(initIV[:8])
low := binary.BigEndian.Uint64(initIV[8:])
origLow := low

low += uint64(counter)
if low < origLow { // wrap
high += 1
}

binary.BigEndian.PutUint64(iv, high)
binary.BigEndian.PutUint64(iv[8:], low)

return iv, nil
}

// aesCtrStep perform AES-CTR XOR operation on given byte string.
// Once encryption and decryption are exactly the same operation for CTR mode,
// this function can be used to perform both.
func aesCtrStep(offset int64, enc *TransparentEncryptionInfo, b []byte) ([]byte, error) {
iv, err := calculateIV(offset, enc.iv)
if err != nil {
return nil, err
}

if enc.cipher == nil {
cipher, err := aes.NewCipher(enc.key)
if err != nil {
return nil, err
}
enc.cipher = cipher
}

stream := cipher.NewCTR(enc.cipher, iv)

padding := offset % aes.BlockSize
if padding > 0 {
tmp := make([]byte, padding)
stream.XORKeyStream(tmp, tmp)
}

text := make([]byte, len(b))
stream.XORKeyStream(text, b)
return text, nil
}
50 changes: 50 additions & 0 deletions aes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package hdfs

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAesChunks(t *testing.T) {
originalText := []byte("some random plain text, nice to have it quite long")
key := []byte("0123456789abcdef")

// Choose iv to hit counter overflow.
iv := []byte("\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xf5")
enc := &TransparentEncryptionInfo{iv: iv, key: key}

// Ensure that we can decrypt text after encryption.
// In CTR mode, implementation for `encrypt` and `decrypt` actually the same
// since we just XOR on input.
encryptedText, err := aesCtrStep(0, enc, originalText)
assert.Equal(t, err, nil)
decryptedText, err := aesCtrStep(0, enc, encryptedText)
assert.Equal(t, err, nil)
assert.Equal(t, originalText, decryptedText)

// CTR mode allow us to encrypt/decrypt string by chunks
// (using correct offset from start of string).
// Ensure that result equal to one, produced in one step.
encryptedByChunks := make([]byte, 0)
var pos int64 = 0
for _, x := range []int{5, 7, 6, 4, 28} {
tmp, err := aesCtrStep(pos, enc, originalText[pos:pos+int64(x)])
assert.Equal(t, err, nil)
encryptedByChunks = append(encryptedByChunks, tmp...)
pos += int64(x)
}
assert.Equal(t, encryptedByChunks, encryptedText)

// Decrypt string by chunks.
// Ensure that result equal to one, produced in one step.
decryptedByChunks := make([]byte, 0)
pos = 0
for _, x := range []int{5, 7, 6, 4, 28} {
tmp, err := aesCtrStep(pos, enc, encryptedText[pos:pos+int64(x)])
assert.Equal(t, err, nil)
decryptedByChunks = append(decryptedByChunks, tmp...)
pos += int64(x)
}
assert.Equal(t, decryptedByChunks, originalText)
}
15 changes: 14 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"io"
"io/ioutil"
"net"
"net/http"
"net/http/cookiejar"
"os"
"os/user"
"sort"
Expand Down Expand Up @@ -36,6 +38,8 @@ type Client struct {

defaults *hdfs.FsServerDefaultsProto
encryptionKey *hdfs.DataEncryptionKeyProto

http *http.Client
}

// ClientOptions represents the configurable options for a client.
Expand Down Expand Up @@ -203,7 +207,16 @@ func NewClient(options ClientOptions) (*Client, error) {
return nil, err
}

return &Client{namenode: namenode, options: options}, nil
// We need cookies to access KMS (required for HDFS encrypted zone).
jar, err := cookiejar.New(nil)
if err != nil {
return nil, errors.New("cant create cookie jar")
}

// Not extending ClientOptions to preserve compatibility, so timeouts not configured.
http := &http.Client{Jar: jar}

return &Client{namenode: namenode, options: options, http: http}, nil
}

// New returns Client connected to the namenode(s) specified by address, or an
Expand Down
44 changes: 44 additions & 0 deletions file_reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hdfs

import (
"crypto/cipher"
"crypto/md5"
"errors"
"fmt"
Expand Down Expand Up @@ -29,6 +30,16 @@ type FileReader struct {
readdirLast string

closed bool

// encryption
enc *TransparentEncryptionInfo
}

// A TransparentEncryptionInfo is a key and iv to encrypt or decrypt file data
type TransparentEncryptionInfo struct {
key []byte
iv []byte
cipher cipher.Block
}

// Open returns an FileReader which can be used for reading.
Expand All @@ -38,11 +49,25 @@ func (c *Client) Open(name string) (*FileReader, error) {
return nil, &os.PathError{"open", name, interpretException(err)}
}

status, ok := info.Sys().(*FileStatus)
if !ok {
return nil, &os.PathError{"open", name, errors.New("internal error: fail to access file status")}
}

var enc *TransparentEncryptionInfo
if status.FileEncryptionInfo != nil {
enc, err = c.kmsGetKey(status.FileEncryptionInfo)
if err != nil {
return nil, &os.PathError{"open", name, err}
}
}

return &FileReader{
client: c,
name: name,
info: info,
closed: false,
enc: enc,
}, nil
}

Expand Down Expand Up @@ -184,6 +209,25 @@ func (f *FileReader) Read(b []byte) (int, error) {
return 0, io.ErrClosedPipe
}

offset := f.offset
n, err := f.readImpl(b)

// Decrypt data chunk if file from HDFS encrypted zone.
if f.enc != nil && n > 0 {
plaintext, err := aesCtrStep(offset, f.enc, b[:n])
if err != nil {
f.offset = offset
return 0, err
}
for i := 0; i < n; i++ {
b[i] = plaintext[i]
}
}

return n, err
}

func (f *FileReader) readImpl(b []byte) (int, error) {
if f.info.IsDir() {
return 0, &os.PathError{
"read",
Expand Down
69 changes: 62 additions & 7 deletions file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ type FileWriter struct {
name string
replication int
blockSize int64
offset int64

blockWriter *transfer.BlockWriter
deadline time.Time

// Key and IV for transparent encryption support.
enc *TransparentEncryptionInfo
}

// Create opens a new file in HDFS with the default replication, block size,
Expand Down Expand Up @@ -62,13 +66,14 @@ func (c *Client) Create(name string) (*FileWriter, error) {
// very important that Close is called after all data has been written.
func (c *Client) CreateFile(name string, replication int, blockSize int64, perm os.FileMode) (*FileWriter, error) {
createReq := &hdfs.CreateRequestProto{
Src: proto.String(name),
Masked: &hdfs.FsPermissionProto{Perm: proto.Uint32(uint32(perm))},
ClientName: proto.String(c.namenode.ClientName),
CreateFlag: proto.Uint32(1),
CreateParent: proto.Bool(false),
Replication: proto.Uint32(uint32(replication)),
BlockSize: proto.Uint64(uint64(blockSize)),
Src: proto.String(name),
Masked: &hdfs.FsPermissionProto{Perm: proto.Uint32(uint32(perm))},
ClientName: proto.String(c.namenode.ClientName),
CreateFlag: proto.Uint32(1),
CreateParent: proto.Bool(false),
Replication: proto.Uint32(uint32(replication)),
BlockSize: proto.Uint64(uint64(blockSize)),
CryptoProtocolVersion: []hdfs.CryptoProtocolVersionProto{hdfs.CryptoProtocolVersionProto_ENCRYPTION_ZONES},
}
createResp := &hdfs.CreateResponseProto{}

Expand All @@ -77,11 +82,21 @@ func (c *Client) CreateFile(name string, replication int, blockSize int64, perm
return nil, &os.PathError{"create", name, interpretCreateException(err)}
}

var enc *TransparentEncryptionInfo
if createResp.GetFs().GetFileEncryptionInfo() != nil {
enc, err = c.kmsGetKey(createResp.GetFs().GetFileEncryptionInfo())
if err != nil {
c.Remove(name)
return nil, &os.PathError{"create", name, err}
}
}

return &FileWriter{
client: c,
name: name,
replication: replication,
blockSize: blockSize,
enc: enc,
}, nil
}

Expand All @@ -106,11 +121,21 @@ func (c *Client) Append(name string) (*FileWriter, error) {
return nil, &os.PathError{"append", name, interpretException(err)}
}

var enc *TransparentEncryptionInfo
if appendResp.GetStat().GetFileEncryptionInfo() != nil {
enc, err = c.kmsGetKey(appendResp.GetStat().GetFileEncryptionInfo())
if err != nil {
return nil, &os.PathError{"append", name, err}
}
}

f := &FileWriter{
client: c,
name: name,
replication: int(appendResp.Stat.GetBlockReplication()),
blockSize: int64(appendResp.Stat.GetBlocksize()),
offset: int64(*appendResp.Stat.Length),
enc: enc,
}

// This returns nil if there are no blocks (it's an empty file) or if the
Expand Down Expand Up @@ -176,6 +201,28 @@ func (f *FileWriter) SetDeadline(t time.Time) error {
// of this, it is important that Close is called after all data has been
// written.
func (f *FileWriter) Write(b []byte) (int, error) {
// Encrypt data chunk if file in HDFS encrypted zone.
if f.enc != nil && len(b) > 0 {
var offset int
for offset < len(b) {
size := min(len(b)-offset, aesChunkSize)
ciphertext, err := aesCtrStep(f.offset, f.enc, b[offset:offset+size])
if err != nil {
return offset, err
}
writtenSize, err := f.writeImpl(ciphertext)
offset += writtenSize
if err != nil {
return offset, err
}
}
return offset, nil
} else {
return f.writeImpl(b)
}
}

func (f *FileWriter) writeImpl(b []byte) (int, error) {
if f.blockWriter == nil {
err := f.startNewBlock()
if err != nil {
Expand All @@ -187,6 +234,7 @@ func (f *FileWriter) Write(b []byte) (int, error) {
for off < len(b) {
n, err := f.blockWriter.Write(b[off:])
off += n
f.offset += int64(n)
if err == transfer.ErrEndOfBlock {
err = f.startNewBlock()
}
Expand Down Expand Up @@ -316,3 +364,10 @@ func (f *FileWriter) finalizeBlock() error {
f.blockWriter = nil
return nil
}

func min(a, b int) int {
if a < b {
return a
}
return b
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9 // indirect
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/pborman/getopt v1.1.0 h1:eJ3aFZroQqq0bWmraivjQNt6Dmm5M0h2JcDW38/Azb0=
github.com/pborman/getopt v1.1.0/go.mod h1:FxXoW1Re00sQG/+KIkuSqRL/LwQgSkv7uyac+STFsbk=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
Loading

0 comments on commit 4d0d7ec

Please sign in to comment.