Skip to content

Commit e84a236

Browse files
committed
initial encrypted zone support
1 parent 262a36a commit e84a236

9 files changed

+530
-8
lines changed

aes.go

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package hdfs
2+
3+
import (
4+
"crypto/aes"
5+
"crypto/cipher"
6+
"encoding/binary"
7+
"fmt"
8+
)
9+
10+
const (
11+
// in FileWriter we use chunks upto aesChunkSize bytes to encrypt data
12+
aesChunkSize = 1024 * 1024
13+
)
14+
15+
// calculateIV `shifts` IV to given offset
16+
// based on calculateIV from AesCtrCryptoCodec.java
17+
func calculateIV(offset int64, initIV []byte) ([]byte, error) {
18+
if len(initIV) != aes.BlockSize {
19+
return nil, fmt.Errorf("calculateIV: invalid iv size: %v", len(initIV))
20+
}
21+
22+
counter := offset / aes.BlockSize
23+
iv := make([]byte, aes.BlockSize)
24+
25+
high := binary.BigEndian.Uint64(initIV[:8])
26+
low := binary.BigEndian.Uint64(initIV[8:])
27+
origLow := low
28+
29+
low += uint64(counter)
30+
if low < origLow { // wrap
31+
high += 1
32+
}
33+
34+
binary.BigEndian.PutUint64(iv, high)
35+
binary.BigEndian.PutUint64(iv[8:], low)
36+
37+
return iv, nil
38+
}
39+
40+
// aesCtrStep perform AES-CTR XOR operation on given byte string.
41+
// Once encryption and decryption are exactly the same operation for CTR mode,
42+
// this function can be used to perform both.
43+
func aesCtrStep(offset int64, enc *transparentEncryptionInfo, b []byte) ([]byte, error) {
44+
iv, err := calculateIV(offset, enc.iv)
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
if enc.cipher == nil {
50+
cipher, err := aes.NewCipher(enc.key)
51+
if err != nil {
52+
return nil, err
53+
}
54+
enc.cipher = cipher
55+
}
56+
57+
stream := cipher.NewCTR(enc.cipher, iv)
58+
59+
padding := offset % aes.BlockSize
60+
if padding > 0 {
61+
tmp := make([]byte, padding)
62+
stream.XORKeyStream(tmp, tmp)
63+
}
64+
65+
text := make([]byte, len(b))
66+
stream.XORKeyStream(text, b)
67+
return text, nil
68+
}

aes_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package hdfs
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestAesChunks(t *testing.T) {
10+
originalText := []byte("some random plain text, nice to have it quite long")
11+
key := []byte("0123456789abcdef")
12+
13+
// Choose iv to hit counter overflow.
14+
iv := []byte("\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xf5")
15+
enc := &transparentEncryptionInfo{iv: iv, key: key}
16+
17+
// Ensure that we can decrypt text after encryption.
18+
// In CTR mode, implementation for `encrypt` and `decrypt` actually the same
19+
// since we just XOR on input.
20+
encryptedText, err := aesCtrStep(0, enc, originalText)
21+
assert.Equal(t, err, nil)
22+
decryptedText, err := aesCtrStep(0, enc, encryptedText)
23+
assert.Equal(t, err, nil)
24+
assert.Equal(t, originalText, decryptedText)
25+
26+
// CTR mode allow us to encrypt/decrypt string by chunks
27+
// (using correct offset from start of string).
28+
// Ensure that result equal to one, produced in one step.
29+
encryptedByChunks := make([]byte, 0)
30+
var pos int64 = 0
31+
for _, x := range []int{5, 7, 6, 4, 28} {
32+
tmp, err := aesCtrStep(pos, enc, originalText[pos:pos+int64(x)])
33+
assert.Equal(t, err, nil)
34+
encryptedByChunks = append(encryptedByChunks, tmp...)
35+
pos += int64(x)
36+
}
37+
assert.Equal(t, encryptedByChunks, encryptedText)
38+
39+
// Decrypt string by chunks.
40+
// Ensure that result equal to one, produced in one step.
41+
decryptedByChunks := make([]byte, 0)
42+
pos = 0
43+
for _, x := range []int{5, 7, 6, 4, 28} {
44+
tmp, err := aesCtrStep(pos, enc, encryptedText[pos:pos+int64(x)])
45+
assert.Equal(t, err, nil)
46+
decryptedByChunks = append(decryptedByChunks, tmp...)
47+
pos += int64(x)
48+
}
49+
assert.Equal(t, decryptedByChunks, originalText)
50+
}

client.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"io"
77
"io/ioutil"
88
"net"
9+
"net/http"
10+
"net/http/cookiejar"
911
"os"
1012
"os/user"
1113
"sort"
@@ -36,6 +38,8 @@ type Client struct {
3638

3739
defaults *hdfs.FsServerDefaultsProto
3840
encryptionKey *hdfs.DataEncryptionKeyProto
41+
42+
http *http.Client
3943
}
4044

4145
// ClientOptions represents the configurable options for a client.
@@ -203,7 +207,16 @@ func NewClient(options ClientOptions) (*Client, error) {
203207
return nil, err
204208
}
205209

206-
return &Client{namenode: namenode, options: options}, nil
210+
// We need cookies to access KMS (required for HDFS encrypted zone).
211+
jar, err := cookiejar.New(nil)
212+
if err != nil {
213+
return nil, errors.New("cant create cookie jar")
214+
}
215+
216+
// Not extending ClientOptions to preserve compatibility, so timeouts not configured.
217+
http := &http.Client{Jar: jar}
218+
219+
return &Client{namenode: namenode, options: options, http: http}, nil
207220
}
208221

209222
// New returns Client connected to the namenode(s) specified by address, or an

file_reader.go

+44
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package hdfs
22

33
import (
4+
"crypto/cipher"
45
"crypto/md5"
56
"errors"
67
"fmt"
@@ -29,6 +30,16 @@ type FileReader struct {
2930
readdirLast string
3031

3132
closed bool
33+
34+
// encryption
35+
enc *transparentEncryptionInfo
36+
}
37+
38+
// A transparentEncryptionInfo is a key and iv to encrypt or decrypt file data
39+
type transparentEncryptionInfo struct {
40+
key []byte
41+
iv []byte
42+
cipher cipher.Block
3243
}
3344

3445
// Open returns an FileReader which can be used for reading.
@@ -38,11 +49,25 @@ func (c *Client) Open(name string) (*FileReader, error) {
3849
return nil, &os.PathError{"open", name, interpretException(err)}
3950
}
4051

52+
status, ok := info.Sys().(*FileStatus)
53+
if !ok {
54+
return nil, &os.PathError{"open", name, errors.New("internal error: fail to access file status")}
55+
}
56+
57+
var enc *transparentEncryptionInfo
58+
if status.FileEncryptionInfo != nil {
59+
enc, err = c.kmsGetKey(status.FileEncryptionInfo)
60+
if err != nil {
61+
return nil, &os.PathError{"open", name, err}
62+
}
63+
}
64+
4165
return &FileReader{
4266
client: c,
4367
name: name,
4468
info: info,
4569
closed: false,
70+
enc: enc,
4671
}, nil
4772
}
4873

@@ -184,6 +209,25 @@ func (f *FileReader) Read(b []byte) (int, error) {
184209
return 0, io.ErrClosedPipe
185210
}
186211

212+
offset := f.offset
213+
n, err := f.readImpl(b)
214+
215+
// Decrypt data chunk if file from HDFS encrypted zone.
216+
if f.enc != nil && n > 0 {
217+
plaintext, err := aesCtrStep(offset, f.enc, b[:n])
218+
if err != nil {
219+
f.offset = offset
220+
return 0, err
221+
}
222+
for i := 0; i < n; i++ {
223+
b[i] = plaintext[i]
224+
}
225+
}
226+
227+
return n, err
228+
}
229+
230+
func (f *FileReader) readImpl(b []byte) (int, error) {
187231
if f.info.IsDir() {
188232
return 0, &os.PathError{
189233
"read",

file_writer.go

+62-7
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@ type FileWriter struct {
2828
replication int
2929
blockSize int64
3030
fileId *uint64
31+
offset int64
3132

3233
blockWriter *transfer.BlockWriter
3334
deadline time.Time
35+
36+
// Key and IV for transparent encryption support.
37+
enc *transparentEncryptionInfo
3438
}
3539

3640
// Create opens a new file in HDFS with the default replication, block size,
@@ -63,13 +67,14 @@ func (c *Client) Create(name string) (*FileWriter, error) {
6367
// very important that Close is called after all data has been written.
6468
func (c *Client) CreateFile(name string, replication int, blockSize int64, perm os.FileMode) (*FileWriter, error) {
6569
createReq := &hdfs.CreateRequestProto{
66-
Src: proto.String(name),
67-
Masked: &hdfs.FsPermissionProto{Perm: proto.Uint32(uint32(perm))},
68-
ClientName: proto.String(c.namenode.ClientName),
69-
CreateFlag: proto.Uint32(1),
70-
CreateParent: proto.Bool(false),
71-
Replication: proto.Uint32(uint32(replication)),
72-
BlockSize: proto.Uint64(uint64(blockSize)),
70+
Src: proto.String(name),
71+
Masked: &hdfs.FsPermissionProto{Perm: proto.Uint32(uint32(perm))},
72+
ClientName: proto.String(c.namenode.ClientName),
73+
CreateFlag: proto.Uint32(1),
74+
CreateParent: proto.Bool(false),
75+
Replication: proto.Uint32(uint32(replication)),
76+
BlockSize: proto.Uint64(uint64(blockSize)),
77+
CryptoProtocolVersion: []hdfs.CryptoProtocolVersionProto{hdfs.CryptoProtocolVersionProto_ENCRYPTION_ZONES},
7378
}
7479
createResp := &hdfs.CreateResponseProto{}
7580

@@ -78,12 +83,22 @@ func (c *Client) CreateFile(name string, replication int, blockSize int64, perm
7883
return nil, &os.PathError{"create", name, interpretCreateException(err)}
7984
}
8085

86+
var enc *transparentEncryptionInfo
87+
if createResp.GetFs().GetFileEncryptionInfo() != nil {
88+
enc, err = c.kmsGetKey(createResp.GetFs().GetFileEncryptionInfo())
89+
if err != nil {
90+
c.Remove(name)
91+
return nil, &os.PathError{"create", name, err}
92+
}
93+
}
94+
8195
return &FileWriter{
8296
client: c,
8397
name: name,
8498
replication: replication,
8599
blockSize: blockSize,
86100
fileId: createResp.Fs.FileId,
101+
enc: enc,
87102
}, nil
88103
}
89104

@@ -108,12 +123,22 @@ func (c *Client) Append(name string) (*FileWriter, error) {
108123
return nil, &os.PathError{"append", name, interpretException(err)}
109124
}
110125

126+
var enc *transparentEncryptionInfo
127+
if appendResp.GetStat().GetFileEncryptionInfo() != nil {
128+
enc, err = c.kmsGetKey(appendResp.GetStat().GetFileEncryptionInfo())
129+
if err != nil {
130+
return nil, &os.PathError{"append", name, err}
131+
}
132+
}
133+
111134
f := &FileWriter{
112135
client: c,
113136
name: name,
114137
replication: int(appendResp.Stat.GetBlockReplication()),
115138
blockSize: int64(appendResp.Stat.GetBlocksize()),
116139
fileId: appendResp.Stat.FileId,
140+
offset: int64(*appendResp.Stat.Length),
141+
enc: enc,
117142
}
118143

119144
// This returns nil if there are no blocks (it's an empty file) or if the
@@ -179,6 +204,28 @@ func (f *FileWriter) SetDeadline(t time.Time) error {
179204
// of this, it is important that Close is called after all data has been
180205
// written.
181206
func (f *FileWriter) Write(b []byte) (int, error) {
207+
// Encrypt data chunk if file in HDFS encrypted zone.
208+
if f.enc != nil && len(b) > 0 {
209+
var offset int
210+
for offset < len(b) {
211+
size := min(len(b)-offset, aesChunkSize)
212+
ciphertext, err := aesCtrStep(f.offset, f.enc, b[offset:offset+size])
213+
if err != nil {
214+
return offset, err
215+
}
216+
writtenSize, err := f.writeImpl(ciphertext)
217+
offset += writtenSize
218+
if err != nil {
219+
return offset, err
220+
}
221+
}
222+
return offset, nil
223+
} else {
224+
return f.writeImpl(b)
225+
}
226+
}
227+
228+
func (f *FileWriter) writeImpl(b []byte) (int, error) {
182229
if f.blockWriter == nil {
183230
err := f.startNewBlock()
184231
if err != nil {
@@ -190,6 +237,7 @@ func (f *FileWriter) Write(b []byte) (int, error) {
190237
for off < len(b) {
191238
n, err := f.blockWriter.Write(b[off:])
192239
off += n
240+
f.offset += int64(n)
193241
if err == transfer.ErrEndOfBlock {
194242
err = f.startNewBlock()
195243
}
@@ -321,3 +369,10 @@ func (f *FileWriter) finalizeBlock() error {
321369
f.blockWriter = nil
322370
return nil
323371
}
372+
373+
func min(a, b int) int {
374+
if a < b {
375+
return a
376+
}
377+
return b
378+
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/jcmturner/gofork v1.0.0 // indirect
1616
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
1717
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
18+
github.com/pkg/errors v0.9.1 // indirect
1819
github.com/pmezard/go-difflib v1.0.0 // indirect
1920
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9 // indirect
2021
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
2323
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
2424
github.com/pborman/getopt v1.1.0 h1:eJ3aFZroQqq0bWmraivjQNt6Dmm5M0h2JcDW38/Azb0=
2525
github.com/pborman/getopt v1.1.0/go.mod h1:FxXoW1Re00sQG/+KIkuSqRL/LwQgSkv7uyac+STFsbk=
26+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
27+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
2628
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2729
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2830
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

0 commit comments

Comments
 (0)