Skip to content

Commit

Permalink
Support advertise-addr in drainer and add check for empty addrs (#644)
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku authored Jun 21, 2019
1 parent 37fa0a0 commit 65f9434
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 22 deletions.
5 changes: 4 additions & 1 deletion cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

# addr (i.e. 'host:port') to listen on for drainer connections
# will register this addr into etcd
# addr = "127.0.0.1:8249"
addr = "127.0.0.1:8249"

# addr(i.e. 'host:port') to advertise to the public
advertise-addr = ""

# the interval time (in seconds) of detect pumps' status
detect-interval = 10
Expand Down
39 changes: 26 additions & 13 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Config struct {
*flag.FlagSet `json:"-"`
LogLevel string `toml:"log-level" json:"log-level"`
ListenAddr string `toml:"addr" json:"addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`
DataDir string `toml:"data-dir" json:"data-dir"`
DetectInterval int `toml:"detect-interval" json:"detect-interval"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
Expand Down Expand Up @@ -97,6 +98,7 @@ func NewConfig() *Config {
fs.PrintDefaults()
}
fs.StringVar(&cfg.ListenAddr, "addr", util.DefaultListenAddr(8249), "addr (i.e. 'host:port') to listen on for drainer connections")
fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", "addr(i.e. 'host:port') to advertise to the public, default to be the same value as -addr")
fs.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "drainer data directory path (default data.drainer)")
fs.IntVar(&cfg.DetectInterval, "detect-interval", defaultDetectInterval, "the interval time (in seconds) of detect pumps' status")
fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints")
Expand Down Expand Up @@ -222,19 +224,11 @@ func adjustInt(v *int, defValue int) {

// validate checks whether the configuration is valid
func (cfg *Config) validate() error {
// check ListenAddr
urllis, err := url.Parse(cfg.ListenAddr)
if err != nil {
return errors.Errorf("parse ListenAddr error: %s, %v", cfg.ListenAddr, err)
}

var host string
if host, _, err = net.SplitHostPort(urllis.Host); err != nil {
return errors.Errorf("bad ListenAddr host format: %s, %v", urllis.Host, err)
if err := validateAddr(cfg.ListenAddr); err != nil {
return errors.Annotate(err, "invalid addr")
}

if !util.IsValidateListenHost(host) {
log.Fatal("drainer listen on: %v and will register this ip into etcd, pumb must access drainer, change the listen addr config", host)
if err := validateAddr(cfg.AdvertiseAddr); err != nil {
return errors.Annotate(err, "invalid advertise-addr")
}

// check EtcdEndpoints
Expand Down Expand Up @@ -268,7 +262,9 @@ func (cfg *Config) validate() error {
func (cfg *Config) adjustConfig() error {
// adjust configuration
adjustString(&cfg.ListenAddr, util.DefaultListenAddr(8249))
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
adjustString(&cfg.AdvertiseAddr, cfg.ListenAddr)
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
cfg.AdvertiseAddr = "http://" + cfg.AdvertiseAddr // add 'http:' scheme to facilitate parsing
adjustString(&cfg.DataDir, defaultDataDir)
adjustInt(&cfg.DetectInterval, defaultDetectInterval)

Expand Down Expand Up @@ -353,3 +349,20 @@ func (cfg *Config) adjustConfig() error {

return nil
}

func validateAddr(addr string) error {
urllis, err := url.Parse(addr)
if err != nil {
return errors.Annotatef(err, "failed to parse addr %v", addr)
}

var host string
if host, _, err = net.SplitHostPort(urllis.Host); err != nil {
return errors.Annotatef(err, "invalid host %v", urllis.Host)
}

if !util.IsValidateListenHost(host) {
log.Warnf("pump may not be able to access drainer using this addr: %s", addr)
}
return nil
}
27 changes: 24 additions & 3 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/util"
)

// Hook up gocheck into the "go test" runner.
Expand All @@ -16,17 +17,19 @@ type testDrainerSuite struct{}

func (t *testDrainerSuite) TestConfig(c *C) {
args := []string{
"-metrics-addr", "127.0.0.1:9091",
"-metrics-addr", "192.168.15.10:9091",
"-txn-batch", "1",
"-data-dir", "data.drainer",
"-dest-db-type", "mysql",
"-config", "../cmd/drainer/drainer.toml",
"-addr", "192.168.15.10:8257",
"-advertise-addr", "192.168.15.10:8257",
}

cfg := NewConfig()
err := cfg.Parse(args)
c.Assert(err, IsNil)
c.Assert(cfg.MetricsAddr, Equals, "127.0.0.1:9091")
c.Assert(cfg.MetricsAddr, Equals, "192.168.15.10:9091")
c.Assert(cfg.DataDir, Equals, "data.drainer")
c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql")
Expand All @@ -41,9 +44,13 @@ func (t *testDrainerSuite) TestValidate(c *C) {

cfg.ListenAddr = "http://123:9091"
err := cfg.validate()
c.Assert(err, ErrorMatches, ".*ListenAddr.*")
c.Assert(err, ErrorMatches, ".*invalid addr.*")

cfg.ListenAddr = "http://192.168.10.12:9091"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*invalid advertise-addr.*")

cfg.AdvertiseAddr = "http://192.168.10.12:9091"
cfg.EtcdURLs = "127.0.0.1:2379,127.0.0.1:2380"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*EtcdURLs.*")
Expand All @@ -68,4 +75,18 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file")
c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1)
c.Assert(cfg.SyncerCfg.DisableDispatch, IsTrue)

cfg = NewConfig()
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249))
c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr)

cfg = NewConfig()
cfg.ListenAddr = "0.0.0.0:8257"
cfg.AdvertiseAddr = "192.168.15.12:8257"
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257")
c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257")
}
4 changes: 2 additions & 2 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ func NewServer(cfg *Config) (*Server, error) {
}
}

advURL, err := url.Parse(cfg.ListenAddr)
advURL, err := url.Parse(cfg.AdvertiseAddr)
if err != nil {
return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.ListenAddr)
return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.AdvertiseAddr)
}

status := node.NewStatus(ID, advURL.Host, node.Online, 0, syncer.GetLatestCommitTS(), util.GetApproachTS(latestTS, latestTime))
Expand Down
7 changes: 6 additions & 1 deletion pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ func DefaultListenAddr(port int32) string {

// IsValidateListenHost judge the host is validate listen host or not.
func IsValidateListenHost(host string) bool {
if host == "127.0.0.1" || host == "localhost" || host == "0.0.0.0" {
if len(host) == 0 {
return false
}
if ip := net.ParseIP(host); ip != nil {
if ip.IsLoopback() {
return false
}
}
return true
}

Expand Down
4 changes: 2 additions & 2 deletions pump/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ func (cfg *Config) validate() error {
if err != nil {
return errors.Errorf("bad AdvertiseAddr host format: %s, %v", urladv.Host, err)
}
if host == "0.0.0.0" {
return errors.New("advertiseAddr host is not allowed to be set to 0.0.0.0")
if len(host) == 0 || host == "0.0.0.0" {
return errors.Errorf("invalid advertiseAddr host: %v", host)
}

// check socketAddr
Expand Down
23 changes: 23 additions & 0 deletions pump/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,29 @@ var _ = Suite(&testConfigSuite{})

type testConfigSuite struct{}

func (s *testConfigSuite) TestValidate(c *C) {
cfg := Config{}
cfg.GC = 1
cfg.ListenAddr = "http://:8250"
cfg.EtcdURLs = "http://192.168.10.23:7777"

cfg.AdvertiseAddr = "http://:8250"
err := cfg.validate()
c.Check(err, ErrorMatches, ".*advertiseAddr.*")

cfg.AdvertiseAddr = "http://0.0.0.0:8250"
err = cfg.validate()
c.Check(err, ErrorMatches, ".*advertiseAddr.*")

cfg.AdvertiseAddr = "http://127.0.0.1:8250"
err = cfg.validate()
c.Check(err, IsNil)

cfg.AdvertiseAddr = "http://192.168.11.11:8250"
err = cfg.validate()
c.Check(err, IsNil)
}

func (s *testConfigSuite) TestConfigParsingCmdLineFlags(c *C) {
args := []string{
"--addr", "192.168.199.100:8260",
Expand Down

0 comments on commit 65f9434

Please sign in to comment.