Skip to content

Commit

Permalink
Add encrypt password config support (#857) (#868)
Browse files Browse the repository at this point in the history
* Add encrypt password support

also add a command to binlogctl, usage example:
./binlogctl -cmd encrypt -text aaa
  • Loading branch information
july2993 authored and WangXiangUSTC committed Jan 7, 2020
1 parent 1874f90 commit c97e501
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 11 deletions.
7 changes: 6 additions & 1 deletion binlogctl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ const (

// OfflineDrainer is comamnd used for offlien drainer.
OfflineDrainer = "offline-drainer"

// Encrypt is command used for encrypt password.
Encrypt = "encrypt"
)

// Config holds the configuration of drainer
Expand All @@ -74,6 +77,7 @@ type Config struct {
SSLKey string `toml:"ssl-key" json:"ssl-key"`
State string `toml:"state" json:"state"`
ShowOfflineNodes bool `toml:"state" json:"show-offline-nodes"`
Text string `toml:"text" json:"text"`
tls *tls.Config
printVersion bool
}
Expand All @@ -83,7 +87,7 @@ func NewConfig() *Config {
cfg := &Config{}
cfg.FlagSet = flag.NewFlagSet("binlogctl", flag.ContinueOnError)

cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\"")
cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\", \"encrypt\"")
cfg.FlagSet.StringVar(&cfg.NodeID, "node-id", "", "id of node, use to update some node with operation update-pump, update-drainer, pause-pump, pause-drainer, offline-pump and offline-drainer")
cfg.FlagSet.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "meta directory path")
cfg.FlagSet.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints")
Expand All @@ -93,6 +97,7 @@ func NewConfig() *Config {
cfg.FlagSet.StringVar(&cfg.TimeZone, "time-zone", "", "set time zone if you want save time info in savepoint file, for example `Asia/Shanghai` for CST time, `Local` for local time")
cfg.FlagSet.StringVar(&cfg.State, "state", "", "set node's state, can set to online, pausing, paused, closing or offline.")
cfg.FlagSet.BoolVar(&cfg.ShowOfflineNodes, "show-offline-nodes", false, "include offline nodes when querying pumps/drainers")
cfg.FlagSet.StringVar(&cfg.Text, "text", "", "text to be encrypt when using encrypt command")
cfg.FlagSet.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")

return cfg
Expand Down
18 changes: 18 additions & 0 deletions binlogctl/encrypt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package binlogctl

import (
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/encrypt"
"go.uber.org/zap"
)

// EncryptHandler log the encrypted text if success or return error.
func EncryptHandler(text string) error {
enc, err := encrypt.Encrypt(text)
if err != nil {
return err
}

log.Info("encrypt text", zap.String("encrypted", string(enc)))
return nil
}
6 changes: 6 additions & 0 deletions cmd/binlogctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func main() {
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, close)
case ctl.OfflineDrainer:
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, close)
case ctl.Encrypt:
if len(cfg.Text) == 0 {
err = errors.New("need to specify the text to be encrypt")
} else {
err = ctl.EncryptHandler(cfg.Text)
}
default:
err = errors.NotSupportedf("cmd %s", cfg.Command)
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"
host = "127.0.0.1"
user = "root"
password = ""
# if encrypted_password is not empty, password will be ignored.
encrypted_password = ""
port = 3306

[syncer.to.checkpoint]
Expand All @@ -93,6 +95,8 @@ port = 3306
# schema = "tidb_binlog"
# host = "127.0.0.1"
# user = "root"
# if encrypted_password is not empty, password will be ignored.
# encrypted_password = ""
# password = ""
# port = 3306

Expand Down
20 changes: 19 additions & 1 deletion drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.uber.org/zap"

dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/pkg/encrypt"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/security"
Expand Down Expand Up @@ -374,11 +375,28 @@ func (cfg *Config) adjustConfig() error {
}
cfg.SyncerCfg.To.User = user
}
if len(cfg.SyncerCfg.To.Password) == 0 {

if len(cfg.SyncerCfg.To.EncryptedPassword) > 0 {
decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword)
if err != nil {
return errors.Annotate(err, "failed to decrypt password in `to.encrypted_password`")
}

cfg.SyncerCfg.To.Password = decrypt
} else if len(cfg.SyncerCfg.To.Password) == 0 {
cfg.SyncerCfg.To.Password = os.Getenv("MYSQL_PSWD")
}
}

if len(cfg.SyncerCfg.To.Checkpoint.EncryptedPassword) > 0 {
decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword)
if err != nil {
return errors.Annotate(err, "failed to decrypt password in `checkpoint.encrypted_password`")
}

cfg.SyncerCfg.To.Checkpoint.Password = decrypt
}

cfg.SyncerCfg.adjustWorkCount()
cfg.SyncerCfg.adjustDoDBAndTable()

Expand Down
30 changes: 30 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/check"
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/pkg/encrypt"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/util"
pkgzk "github.com/pingcap/tidb-binlog/pkg/zk"
Expand Down Expand Up @@ -151,13 +153,41 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249))
c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr)

// test EncryptedPassword
cfg = NewConfig()
cfg.ListenAddr = "0.0.0.0:8257"
cfg.AdvertiseAddr = "192.168.15.12:8257"
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257")
c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257")

cfg = NewConfig()
encrypted, err := encrypt.Encrypt("origin")
c.Assert(err, IsNil)

cfg.SyncerCfg.To = &dsync.DBConfig{
EncryptedPassword: string(encrypted),
Checkpoint: dsync.CheckpointConfig{
EncryptedPassword: string(encrypted),
},
}
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.SyncerCfg.To.Password, check.Equals, "origin")
c.Assert(cfg.SyncerCfg.To.Checkpoint.Password, check.Equals, "origin")

// test false positive
cfg.SyncerCfg.To = &dsync.DBConfig{
EncryptedPassword: "what ever" + string(encrypted),
Checkpoint: dsync.CheckpointConfig{
EncryptedPassword: "what ever" + string(encrypted),
},
}

c.Logf("to.password: %v", cfg.SyncerCfg.To.Password)
err = cfg.adjustConfig()
c.Assert(err, NotNil)
}

func (t *testDrainerSuite) TestConfigParsingFileWithInvalidOptions(c *C) {
Expand Down
22 changes: 13 additions & 9 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import (

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
TimeLimit string `toml:"time-limit" json:"time-limit"`
SizeLimit string `toml:"size-limit" json:"size-limit"`
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
// if EncryptedPassword is not empty, Password will be ignore.
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
TimeLimit string `toml:"time-limit" json:"time-limit"`
SizeLimit string `toml:"size-limit" json:"size-limit"`

ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
Expand All @@ -45,7 +47,9 @@ type CheckpointConfig struct {
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
Port int `toml:"port" json:"port"`
// if EncryptedPassword is not empty, Password will be ignore.
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
Port int `toml:"port" json:"port"`
}

type baseError struct {
Expand Down
159 changes: 159 additions & 0 deletions pkg/encrypt/encrypt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package encrypt

import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"os"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
)

var (
defaultSecretKey, _ = hex.DecodeString("a529b7665997f043a30ac8fadcb51d6aa032c226ab5b7750530b12b8c1a16a48")
secretKey []byte
ivSep = []byte("@") // ciphertext format: iv + ivSep + encrypted-plaintext
)

var initSecretKeyOnce sync.Once
var initSecretKeyErr error

func initSecretKey() error {
hexKey := os.Getenv("BINLOG_SECRET_KEY")
if len(hexKey) == 0 {
log.Warn("use the default secret key to encrypt")
secretKey = defaultSecretKey
return nil
}

key, err := hex.DecodeString(hexKey)
if err != nil {
return errors.Trace(err)
}

return SetSecretKey(key)
}

// SetSecretKey sets the secret key which used to encrypt
func SetSecretKey(key []byte) error {
switch len(key) {
case 16, 24, 32:
break
default:
return errors.Errorf("secretKey not valid: %v", key)
}
secretKey = key
return nil
}

// Encrypt tries to encrypt plaintext to base64 encoded ciphertext
func Encrypt(plaintext string) (string, error) {
ciphertext, err := encrypt([]byte(plaintext))
if err != nil {
return "", err
}

return base64.StdEncoding.EncodeToString(ciphertext), nil
}

// Decrypt tries to decrypt base64 encoded ciphertext to plaintext
func Decrypt(ciphertextB64 string) (string, error) {
ciphertext, err := base64.StdEncoding.DecodeString(ciphertextB64)
if err != nil {
return "", errors.Annotatef(err, "base 64 failed to decode: %s", ciphertext)
}

plaintext, err := decrypt(ciphertext)
if err != nil {
return "", errors.Trace(err)
}
return string(plaintext), nil
}

// encrypt encrypts plaintext to ciphertext
func encrypt(plaintext []byte) ([]byte, error) {
initSecretKeyOnce.Do(func() {
initSecretKeyErr = initSecretKey()
})
if initSecretKeyErr != nil {
return nil, initSecretKeyErr
}

block, err := aes.NewCipher(secretKey)
if err != nil {
return nil, errors.Trace(err)
}

iv, err := genIV(block.BlockSize())
if err != nil {
return nil, err
}

ciphertext := make([]byte, 0, len(iv)+len(ivSep)+len(plaintext))
ciphertext = append(ciphertext, iv...)
ciphertext = append(ciphertext, ivSep...)
ciphertext = append(ciphertext, plaintext...) // will be overwrite by XORKeyStream

stream := cipher.NewCFBEncrypter(block, iv)
stream.XORKeyStream(ciphertext[len(iv)+len(ivSep):], plaintext)

return ciphertext, nil
}

// decrypt decrypts ciphertext to plaintext
func decrypt(ciphertext []byte) ([]byte, error) {
initSecretKeyOnce.Do(func() {
initSecretKeyErr = initSecretKey()
})
if initSecretKeyErr != nil {
return nil, initSecretKeyErr
}

block, err := aes.NewCipher(secretKey)
if err != nil {
return nil, err
}

if len(ciphertext) < block.BlockSize()+len(ivSep) {
// return nil, terror.ErrCiphertextLenNotValid.Generate(block.BlockSize()+len(ivSep), len(ciphertext))
return nil, errors.Errorf("ciphertext not valid")
}

if !bytes.Equal(ciphertext[block.BlockSize():block.BlockSize()+len(ivSep)], ivSep) {
// return nil, terror.ErrCiphertextContextNotValid.Generate()
return nil, errors.Errorf("ciphertext not valid")
}

iv := ciphertext[:block.BlockSize()]
ciphertext = ciphertext[block.BlockSize()+len(ivSep):]
plaintext := make([]byte, len(ciphertext))

stream := cipher.NewCFBDecrypter(block, iv)
stream.XORKeyStream(plaintext, ciphertext)

return plaintext, nil
}

func genIV(n int) ([]byte, error) {
b := make([]byte, n)
_, err := rand.Read(b)
return b, errors.Trace(err)
}
Loading

0 comments on commit c97e501

Please sign in to comment.