diff --git a/config/config.go b/config/config.go index 2f771e38d13..3dd104aab0f 100644 --- a/config/config.go +++ b/config/config.go @@ -56,6 +56,7 @@ var ( Enabled: false, AllowUnsafe: []string{}, }, + PIDFileLocation: "/var/run/tyk/tyk-gateway.pid", } ) diff --git a/gateway/api.go b/gateway/api.go index b1aa078b38b..1bd78006aa0 100644 --- a/gateway/api.go +++ b/gateway/api.go @@ -2236,13 +2236,16 @@ func (gw *Gateway) createOauthClient(w http.ResponseWriter, r *http.Request) { storageManager := gw.getGlobalMDCBStorageHandler(prefix, false) storageManager.Connect() + storageDriver := &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler} + storageDriver.Connect() + apiSpec.OAuthManager = &OAuthManager{ OsinServer: gw.TykOsinNewServer( &osin.ServerConfig{}, &RedisOsinStorageInterface{ storageManager, gw.GlobalSessionManager, - &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler}, + storageDriver, apiSpec.OrgID, gw, }), @@ -2623,12 +2626,16 @@ func (gw *Gateway) getOauthClientDetails(keyName, apiID string) (interface{}, in prefix := generateOAuthPrefix(apiSpec.APIID) storageManager := gw.getGlobalMDCBStorageHandler(prefix, false) storageManager.Connect() + + storageDriver := &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler} + storageDriver.Connect() + apiSpec.OAuthManager = &OAuthManager{ OsinServer: gw.TykOsinNewServer(&osin.ServerConfig{}, &RedisOsinStorageInterface{ storageManager, gw.GlobalSessionManager, - &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler}, + storageDriver, apiSpec.OrgID, gw, }), diff --git a/gateway/api_loader.go b/gateway/api_loader.go index 90f954c9ca2..b59112f2919 100644 --- a/gateway/api_loader.go +++ b/gateway/api_loader.go @@ -45,9 +45,16 @@ type ChainObject struct { func (gw *Gateway) prepareStorage() generalStores { var gs generalStores + gs.redisStore = &storage.RedisCluster{KeyPrefix: "apikey-", HashKeys: gw.GetConfig().HashKeys, ConnectionHandler: gw.StorageConnectionHandler} + gs.redisStore.Connect() + gs.redisOrgStore = &storage.RedisCluster{KeyPrefix: "orgkey.", ConnectionHandler: gw.StorageConnectionHandler} + gs.redisOrgStore.Connect() + gs.healthStore = &storage.RedisCluster{KeyPrefix: "apihealth.", ConnectionHandler: gw.StorageConnectionHandler} + gs.healthStore.Connect() + gs.rpcAuthStore = &RPCStorageHandler{KeyPrefix: "apikey-", HashKeys: gw.GetConfig().HashKeys, Gw: gw} gs.rpcOrgStore = gw.getGlobalMDCBStorageHandler("orgkey.", false) @@ -56,7 +63,6 @@ func (gw *Gateway) prepareStorage() generalStores { } func (gw *Gateway) skipSpecBecauseInvalid(spec *APISpec, logger *logrus.Entry) bool { - switch spec.Protocol { case "", "http", "https": if spec.Proxy.ListenPath == "" { @@ -1064,7 +1070,8 @@ func (gw *Gateway) loadApps(specs []*APISpec) { gwListenPort := gw.GetConfig().ListenPort controlApiIsConfigured := (gw.GetConfig().ControlAPIPort != 0 && gw.GetConfig().ControlAPIPort != gwListenPort) || gw.GetConfig().ControlAPIHostname != "" - if gw.allApisAreMTLS() && !gw.GetConfig().Security.ControlAPIUseMutualTLS && !controlApiIsConfigured { + + if !gw.isRunningTests() && gw.allApisAreMTLS() && !gw.GetConfig().Security.ControlAPIUseMutualTLS && !controlApiIsConfigured { mainLog.Warning("All APIs are protected with mTLS, except for the control API. " + "We recommend configuring the control API port or control hostname to ensure consistent security measures") } diff --git a/gateway/coprocess_api.go b/gateway/coprocess_api.go index e2227f8ac86..892d1448a43 100644 --- a/gateway/coprocess_api.go +++ b/gateway/coprocess_api.go @@ -16,13 +16,15 @@ import ( // CoProcessDefaultKeyPrefix is used as a key prefix for this CP. const CoProcessDefaultKeyPrefix = "coprocess-data:" -func getStorageForPython(ctx context.Context) storage.RedisCluster { +func getStorageForPython(ctx context.Context) *storage.RedisCluster { rc := storage.NewConnectionHandler(ctx) go rc.Connect(ctx, nil, &config.Config{}) rc.WaitConnect(ctx) - return storage.RedisCluster{KeyPrefix: CoProcessDefaultKeyPrefix, ConnectionHandler: rc} + handler := &storage.RedisCluster{KeyPrefix: CoProcessDefaultKeyPrefix, ConnectionHandler: rc} + handler.Connect() + return handler } // TykStoreData is a CoProcess API function for storing data. diff --git a/gateway/coprocess_id_extractor_test.go b/gateway/coprocess_id_extractor_test.go index e1a328a850d..b5e89dbd793 100644 --- a/gateway/coprocess_id_extractor_test.go +++ b/gateway/coprocess_id_extractor_test.go @@ -31,19 +31,27 @@ const ( func (ts *Test) createSpecTestFrom(tb testing.TB, def *apidef.APIDefinition) *APISpec { tb.Helper() + loader := APIDefinitionLoader{Gw: ts.Gw} spec, _ := loader.MakeSpec(&model.MergedAPI{APIDefinition: def}, nil) tname := tb.Name() + redisStore := &storage.RedisCluster{KeyPrefix: tname + "-apikey.", ConnectionHandler: ts.Gw.StorageConnectionHandler} + redisStore.Connect() + healthStore := &storage.RedisCluster{KeyPrefix: tname + "-apihealth.", ConnectionHandler: ts.Gw.StorageConnectionHandler} + healthStore.Connect() + orgStore := &storage.RedisCluster{KeyPrefix: tname + "-orgKey.", ConnectionHandler: ts.Gw.StorageConnectionHandler} + orgStore.Connect() + spec.Init(redisStore, redisStore, healthStore, orgStore) return spec } -func (ts *Test) prepareExtractor(tb testing.TB, extractorSource apidef.IdExtractorSource, extractorType apidef.IdExtractorType, - config map[string]interface{}, disabled bool) (IdExtractor, *APISpec) { +func (ts *Test) prepareExtractor(tb testing.TB, extractorSource apidef.IdExtractorSource, extractorType apidef.IdExtractorType, config map[string]interface{}, disabled bool) (IdExtractor, *APISpec) { tb.Helper() + def := &apidef.APIDefinition{ OrgID: MockOrgID, CustomMiddleware: apidef.MiddlewareSection{ diff --git a/gateway/delete_api_cache.go b/gateway/delete_api_cache.go index 5cc138ec13b..0a28e37e2c0 100644 --- a/gateway/delete_api_cache.go +++ b/gateway/delete_api_cache.go @@ -8,5 +8,7 @@ import ( func (gw *Gateway) invalidateAPICache(apiID string) bool { store := storage.RedisCluster{IsCache: true, ConnectionHandler: gw.StorageConnectionHandler} + store.Connect() + return store.DeleteScanMatch(fmt.Sprintf("cache-%s*", apiID)) } diff --git a/gateway/event_handler_webhooks.go b/gateway/event_handler_webhooks.go index e985a7b3ab7..0e572efb829 100644 --- a/gateway/event_handler_webhooks.go +++ b/gateway/event_handler_webhooks.go @@ -161,9 +161,14 @@ func (w *WebHookHandler) getRequestMethod(m string) WebHookRequestMethod { } func (w *WebHookHandler) checkURL(r string) bool { + if r == "" { + return false + } + log.WithFields(logrus.Fields{ "prefix": "webhooks", }).Debug("Checking URL: ", r) + if _, err := url.ParseRequestURI(r); err != nil { log.WithFields(logrus.Fields{ "prefix": "webhooks", diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index e104fccee6c..8244dead4b7 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -889,7 +889,10 @@ func TestCacheAllSafeRequests(t *testing.T) { t.Skip() // DeleteScanMatch interferes with other tests. ts := StartTest(nil) + cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler} + cache.Connect() + t.Cleanup(func() { ts.Close() cache.DeleteScanMatch("*") @@ -919,7 +922,10 @@ func TestCacheAllSafeRequestsWithCachedHeaders(t *testing.T) { t.Skip() // DeleteScanMatch interferes with other tests. ts := StartTest(nil) + cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler} + cache.Connect() + t.Cleanup(func() { ts.Close() cache.DeleteScanMatch("*") @@ -965,7 +971,10 @@ func TestCacheWithAdvanceUrlRewrite(t *testing.T) { t.Skip() // DeleteScanMatch interferes with other tests. ts := StartTest(nil) + cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler} + cache.Connect() + t.Cleanup(func() { ts.Close() cache.DeleteScanMatch("*") @@ -1024,7 +1033,10 @@ func TestCachePostRequest(t *testing.T) { t.Skip() // DeleteScanMatch interferes with other tests. ts := StartTest(nil) + cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler} + cache.Connect() + t.Cleanup(func() { ts.Close() cache.DeleteScanMatch("*") @@ -1071,7 +1083,10 @@ func TestAdvanceCachePutRequest(t *testing.T) { t.Skip() // DeleteScanMatch interferes with other tests. ts := StartTest(nil) + cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler} + cache.Connect() + t.Cleanup(func() { ts.Close() cache.DeleteScanMatch("*") @@ -1164,7 +1179,10 @@ func TestCacheAllSafeRequestsWithAdvancedCacheEndpoint(t *testing.T) { t.Skip() // DeleteScanMatch interferes with other tests. ts := StartTest(nil) + cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler} + cache.Connect() + t.Cleanup(func() { ts.Close() cache.DeleteScanMatch("*") @@ -1203,7 +1221,9 @@ func TestCacheEtag(t *testing.T) { t.Skip() // DeleteScanMatch interferes with other tests. ts := StartTest(nil) + cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler} + cache.Connect() t.Cleanup(func() { ts.Close() @@ -1258,7 +1278,10 @@ func TestOldCachePlugin(t *testing.T) { t.Helper() ts := StartTest(nil) + cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler} + cache.Connect() + t.Cleanup(func() { ts.Close() cache.DeleteScanMatch("*") @@ -1287,7 +1310,10 @@ func TestAdvanceCacheTimeoutPerEndpoint(t *testing.T) { t.Skip() // DeleteScanMatch interferes with other tests. ts := StartTest(nil) + cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler} + cache.Connect() + t.Cleanup(func() { ts.Close() cache.DeleteScanMatch("*") diff --git a/gateway/health_check.go b/gateway/health_check.go index 1a3c57d7d43..389c2355e21 100644 --- a/gateway/health_check.go +++ b/gateway/health_check.go @@ -79,6 +79,7 @@ func (gw *Gateway) gatherHealthChecks() { allInfos := SafeHealthCheck{info: make(map[string]HealthCheckItem, 3)} redisStore := storage.RedisCluster{KeyPrefix: "livenesscheck-", ConnectionHandler: gw.StorageConnectionHandler} + redisStore.Connect() key := "tyk-liveness-probe" diff --git a/gateway/host_checker.go b/gateway/host_checker.go index 44df5eb6689..4a26f949fa6 100644 --- a/gateway/host_checker.go +++ b/gateway/host_checker.go @@ -398,6 +398,10 @@ func eraseSyncMap(m *sync.Map) { } func (h *HostUptimeChecker) Stop() { + if h == nil { + return + } + was := atomic.SwapInt32(&h.isClosed, CLOSED) if was == OPEN { eraseSyncMap(h.samples) diff --git a/gateway/host_checker_manager.go b/gateway/host_checker_manager.go index 76ac2c9b201..8365124f2dc 100644 --- a/gateway/host_checker_manager.go +++ b/gateway/host_checker_manager.go @@ -82,7 +82,7 @@ func (hc *HostCheckerManager) Init(store storage.Handler) { func (hc *HostCheckerManager) Start(ctx context.Context) { // Start loop to check if we are active instance - if hc.Id != "" { + if hc != nil { go hc.CheckActivePollerLoop(ctx) } } @@ -181,7 +181,6 @@ func (hc *HostCheckerManager) AmIPolling() bool { } func (hc *HostCheckerManager) StartPoller(ctx context.Context) { - log.WithFields(logrus.Fields{ "prefix": "host-check-mgr", }).Debug("---> Initialising checker") @@ -192,7 +191,8 @@ func (hc *HostCheckerManager) StartPoller(ctx context.Context) { hc.checker = &HostUptimeChecker{Gw: hc.Gw} } - hc.checker.Init(hc.Gw.GetConfig().UptimeTests.Config.CheckerPoolSize, + hc.checker.Init( + hc.Gw.GetConfig().UptimeTests.Config.CheckerPoolSize, hc.Gw.GetConfig().UptimeTests.Config.FailureTriggerSampleSize, hc.Gw.GetConfig().UptimeTests.Config.TimeWait, hc.currentHostList, @@ -207,14 +207,21 @@ func (hc *HostCheckerManager) StartPoller(ctx context.Context) { log.WithFields(logrus.Fields{ "prefix": "host-check-mgr", }).Debug("---> Starting checker") + hc.checker.Start(ctx) + log.WithFields(logrus.Fields{ "prefix": "host-check-mgr", }).Debug("---> Checker started.") + hc.checkerMu.Unlock() } func (hc *HostCheckerManager) StopPoller() { + if hc == nil { + return + } + hc.checkerMu.Lock() hc.checker.Stop() hc.checkerMu.Unlock() @@ -536,11 +543,11 @@ func (hc *HostCheckerManager) RecordUptimeAnalytics(report HostHealthReport) err func (gw *Gateway) InitHostCheckManager(ctx context.Context, store storage.Handler) { // Already initialized - if gw.GlobalHostChecker.Id != "" { + if gw.GlobalHostChecker != nil { return } - gw.GlobalHostChecker = HostCheckerManager{Gw: gw} + gw.GlobalHostChecker = &HostCheckerManager{Gw: gw} gw.GlobalHostChecker.Init(store) gw.GlobalHostChecker.Start(ctx) } diff --git a/gateway/host_checker_manager_test.go b/gateway/host_checker_manager_test.go index d50e6dbe266..7a21486032d 100644 --- a/gateway/host_checker_manager_test.go +++ b/gateway/host_checker_manager_test.go @@ -14,7 +14,10 @@ func TestHostCheckerManagerInit(t *testing.T) { defer ts.Close() hc := HostCheckerManager{Gw: ts.Gw} + redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-test:", ConnectionHandler: ts.Gw.StorageConnectionHandler} + redisStorage.Connect() + hc.Init(redisStorage) if hc.Id == "" { @@ -46,6 +49,8 @@ func TestAmIPolling(t *testing.T) { ts.Gw.SetConfig(globalConf) redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-test:", ConnectionHandler: ts.Gw.StorageConnectionHandler} + redisStorage.Connect() + hc.Init(redisStorage) hc2 := HostCheckerManager{Gw: ts.Gw} hc2.Init(redisStorage) @@ -74,7 +79,10 @@ func TestAmIPolling(t *testing.T) { //Testing if the PollerCacheKey doesn't contains the poller_group by default hc = HostCheckerManager{Gw: ts.Gw} + redisStorage = &storage.RedisCluster{KeyPrefix: "host-checker-test:", ConnectionHandler: ts.Gw.StorageConnectionHandler} + redisStorage.Connect() + hc.Init(redisStorage) hc.AmIPolling() @@ -106,7 +114,10 @@ func TestCheckActivePollerLoop(t *testing.T) { defer ts.Close() hc := &HostCheckerManager{Gw: ts.Gw} + redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-test-1:", ConnectionHandler: ts.Gw.StorageConnectionHandler} + redisStorage.Connect() + hc.Init(redisStorage) go hc.CheckActivePollerLoop(ts.Gw.ctx) @@ -122,7 +133,10 @@ func TestStartPoller(t *testing.T) { defer ts.Close() hc := HostCheckerManager{Gw: ts.Gw} + redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-TestStartPoller:", ConnectionHandler: ts.Gw.StorageConnectionHandler} + redisStorage.Connect() + hc.Init(redisStorage) hc.StartPoller(ts.Gw.ctx) @@ -135,9 +149,12 @@ func TestStartPoller(t *testing.T) { func TestRecordUptimeAnalytics(t *testing.T) { ts := StartTest(nil) defer ts.Close() + hc := &HostCheckerManager{Gw: ts.Gw} redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-test-analytics:", ConnectionHandler: ts.Gw.StorageConnectionHandler} + redisStorage.Connect() + hc.Init(redisStorage) spec := &APISpec{} diff --git a/gateway/mw_external_oauth.go b/gateway/mw_external_oauth.go index 39730badb4d..dcf564ee1b7 100644 --- a/gateway/mw_external_oauth.go +++ b/gateway/mw_external_oauth.go @@ -296,11 +296,14 @@ func isExpired(claims jwt.MapClaims) bool { } func newIntrospectionCache(gw *Gateway) *introspectionCache { - return &introspectionCache{RedisCluster: storage.RedisCluster{KeyPrefix: "introspection-", ConnectionHandler: gw.StorageConnectionHandler}} + conn := &storage.RedisCluster{KeyPrefix: "introspection-", ConnectionHandler: gw.StorageConnectionHandler} + conn.Connect() + + return &introspectionCache{RedisCluster: conn} } type introspectionCache struct { - storage.RedisCluster + *storage.RedisCluster } func (c *introspectionCache) GetRes(token string) (jwt.MapClaims, bool) { diff --git a/gateway/mw_jwt.go b/gateway/mw_jwt.go index d00bec84659..e61be690dd1 100644 --- a/gateway/mw_jwt.go +++ b/gateway/mw_jwt.go @@ -579,12 +579,16 @@ func (k *JWTMiddleware) processCentralisedJWT(r *http.Request, token *jwt.Token) prefix := generateOAuthPrefix(k.Spec.APIID) storageManager := k.Gw.getGlobalMDCBStorageHandler(prefix, false) storageManager.Connect() + + storageDriver := &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: k.Gw.StorageConnectionHandler} + storageDriver.Connect() + k.Spec.OAuthManager = &OAuthManager{ OsinServer: k.Gw.TykOsinNewServer(&osin.ServerConfig{}, &RedisOsinStorageInterface{ storageManager, k.Gw.GlobalSessionManager, - &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: k.Gw.StorageConnectionHandler}, + storageDriver, k.Spec.OrgID, k.Gw, }), diff --git a/gateway/mw_oauth2_auth_ee.go b/gateway/mw_oauth2_auth_ee.go index a034ea7d019..84224c3e403 100644 --- a/gateway/mw_oauth2_auth_ee.go +++ b/gateway/mw_oauth2_auth_ee.go @@ -22,9 +22,13 @@ func getUpstreamOAuthMw(base *BaseMiddleware) TykMiddleware { } func getClientCredentialsStorageHandler(base *BaseMiddleware) *storage.RedisCluster { - return &storage.RedisCluster{KeyPrefix: "upstreamOAuthCC-", ConnectionHandler: base.Gw.StorageConnectionHandler} + handler := &storage.RedisCluster{KeyPrefix: "upstreamOAuthCC-", ConnectionHandler: base.Gw.StorageConnectionHandler} + handler.Connect() + return handler } func getPasswordStorageHandler(base *BaseMiddleware) *storage.RedisCluster { - return &storage.RedisCluster{KeyPrefix: "upstreamOAuthPW-", ConnectionHandler: base.Gw.StorageConnectionHandler} + handler := &storage.RedisCluster{KeyPrefix: "upstreamOAuthPW-", ConnectionHandler: base.Gw.StorageConnectionHandler} + handler.Connect() + return handler } diff --git a/gateway/oauth_manager.go b/gateway/oauth_manager.go index ef95b45f5f2..1588e835ea1 100644 --- a/gateway/oauth_manager.go +++ b/gateway/oauth_manager.go @@ -1195,6 +1195,7 @@ func (gw *Gateway) purgeLapsedOAuthTokens() error { } redisCluster := &storage.RedisCluster{KeyPrefix: "", HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler} + redisCluster.Connect() ok, err := redisCluster.Lock("oauth-purge-lock", time.Minute) if err != nil { diff --git a/gateway/redis_logrus_hook.go b/gateway/redis_logrus_hook.go index b5a41c78eab..fcd172675a2 100644 --- a/gateway/redis_logrus_hook.go +++ b/gateway/redis_logrus_hook.go @@ -16,13 +16,13 @@ type redisChannelHook struct { func (gw *Gateway) newRedisHook() *redisChannelHook { hook := &redisChannelHook{} hook.formatter = new(logrus.JSONFormatter) - hook.notifier.store = &storage.RedisCluster{KeyPrefix: "gateway-notifications:", ConnectionHandler: gw.StorageConnectionHandler} hook.notifier.channel = "dashboard.ui.messages" + hook.notifier.store = &storage.RedisCluster{KeyPrefix: "gateway-notifications:", ConnectionHandler: gw.StorageConnectionHandler} + hook.notifier.store.Connect() return hook } func (hook *redisChannelHook) Fire(entry *logrus.Entry) error { - orgId, found := entry.Data["org_id"] if !found { return nil @@ -44,6 +44,7 @@ func (hook *redisChannelHook) Fire(entry *logrus.Entry) error { } go hook.notifier.Notify(n) + return nil } diff --git a/gateway/rpc_backup_handlers.go b/gateway/rpc_backup_handlers.go index 121b7fb64bc..5c8416d0ff8 100644 --- a/gateway/rpc_backup_handlers.go +++ b/gateway/rpc_backup_handlers.go @@ -30,8 +30,9 @@ func (gw *Gateway) LoadDefinitionsFromRPCBackup() ([]*APISpec, error) { tagList := getTagListAsString(gw.GetConfig().DBAppConfOptions.Tags) checkKey := BackupApiKeyBase + tagList - store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler} + store := &storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler} connected := store.Connect() + log.Info("[RPC] --> Loading API definitions from backup") if !connected { @@ -60,7 +61,7 @@ func (gw *Gateway) saveRPCDefinitionsBackup(list string) error { log.Info("--> Connecting to DB") - store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler} + store := &storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler} connected := store.Connect() log.Info("--> Connected to DB") @@ -83,9 +84,9 @@ func (gw *Gateway) LoadPoliciesFromRPCBackup() (map[string]user.Policy, error) { tagList := getTagListAsString(gw.GetConfig().DBAppConfOptions.Tags) checkKey := BackupPolicyKeyBase + tagList - store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler} - + store := &storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler} connected := store.Connect() + log.Info("[RPC] Loading Policies from backup") if !connected { diff --git a/gateway/rpc_storage_handler.go b/gateway/rpc_storage_handler.go index ba01c439fc6..eb698a3a648 100644 --- a/gateway/rpc_storage_handler.go +++ b/gateway/rpc_storage_handler.go @@ -162,7 +162,7 @@ func (r *RPCStorageHandler) buildNodeInfo() []byte { intCheckDuration = int64(checkDuration / time.Second) } - r.Gw.getHostDetails(r.Gw.GetConfig().PIDFileLocation) + r.Gw.getHostDetails() node := model.NodeData{ NodeID: r.Gw.GetNodeID(), GroupID: config.SlaveOptions.GroupID, diff --git a/gateway/server.go b/gateway/server.go index 2b206c70214..767fe0c8277 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -119,7 +119,7 @@ type Gateway struct { RPCListener RPCStorageHandler DashService DashboardServiceSender CertificateManager certs.CertificateManager - GlobalHostChecker HostCheckerManager + GlobalHostChecker *HostCheckerManager ConnectionWatcher *httputil.ConnectionWatcher HostCheckTicker chan struct{} HostCheckerClient *http.Client @@ -374,17 +374,22 @@ func (gw *Gateway) setupGlobals() { mainLog.Warn("Running Uptime checks in a management node.") } - healthCheckStore := storage.RedisCluster{KeyPrefix: "host-checker:", IsAnalytics: true, ConnectionHandler: gw.StorageConnectionHandler} - gw.InitHostCheckManager(gw.ctx, &healthCheckStore) + healthCheckStore := &storage.RedisCluster{KeyPrefix: "host-checker:", IsAnalytics: true, ConnectionHandler: gw.StorageConnectionHandler} + healthCheckStore.Connect() + + gw.InitHostCheckManager(gw.ctx, healthCheckStore) } gw.initHealthCheck(gw.ctx) - redisStore := storage.RedisCluster{KeyPrefix: "apikey-", HashKeys: gwConfig.HashKeys, ConnectionHandler: gw.StorageConnectionHandler} - gw.GlobalSessionManager.Init(&redisStore) + redisStore := &storage.RedisCluster{KeyPrefix: "apikey-", HashKeys: gwConfig.HashKeys, ConnectionHandler: gw.StorageConnectionHandler} + redisStore.Connect() + + gw.GlobalSessionManager.Init(redisStore) - versionStore := storage.RedisCluster{KeyPrefix: "version-check-", ConnectionHandler: gw.StorageConnectionHandler} + versionStore := &storage.RedisCluster{KeyPrefix: "version-check-", ConnectionHandler: gw.StorageConnectionHandler} versionStore.Connect() + err := versionStore.SetKey("gateway", VERSION, 0) if err != nil { @@ -397,12 +402,16 @@ func (gw *Gateway) setupGlobals() { gw.SetConfig(Conf) mainLog.Debug("Setting up analytics DB connection") - analyticsStore := storage.RedisCluster{KeyPrefix: "analytics-", IsAnalytics: true, ConnectionHandler: gw.StorageConnectionHandler} - gw.Analytics.Store = &analyticsStore + analyticsStore := &storage.RedisCluster{KeyPrefix: "analytics-", IsAnalytics: true, ConnectionHandler: gw.StorageConnectionHandler} + analyticsStore.Connect() + + gw.Analytics.Store = analyticsStore gw.Analytics.Init() - store := storage.RedisCluster{KeyPrefix: "analytics-", IsAnalytics: true, ConnectionHandler: gw.StorageConnectionHandler} - redisPurger := RedisPurger{Store: &store, Gw: gw} + store := &storage.RedisCluster{KeyPrefix: "analytics-", IsAnalytics: true, ConnectionHandler: gw.StorageConnectionHandler} + store.Connect() + + redisPurger := RedisPurger{Store: store, Gw: gw} go redisPurger.PurgeLoop(gw.ctx) if gw.GetConfig().AnalyticsConfig.Type == "rpc" { @@ -411,11 +420,14 @@ func (gw *Gateway) setupGlobals() { } else { mainLog.Debug("Using RPC cache purge") - store := storage.RedisCluster{KeyPrefix: "analytics-", IsAnalytics: true, ConnectionHandler: gw.StorageConnectionHandler} + store := &storage.RedisCluster{KeyPrefix: "analytics-", IsAnalytics: true, ConnectionHandler: gw.StorageConnectionHandler} + store.Connect() + purger := rpc.Purger{ - Store: &store, + Store: store, } purger.Connect() + go purger.PurgeLoop(gw.ctx, time.Duration(gw.GetConfig().AnalyticsConfig.PurgeInterval)) } @@ -431,8 +443,10 @@ func (gw *Gateway) setupGlobals() { // Get the notifier ready mainLog.Debug("Notifier will not work in hybrid mode") + mainNotifierStore := &storage.RedisCluster{ConnectionHandler: gw.StorageConnectionHandler} mainNotifierStore.Connect() + gw.MainNotifier = RedisNotifier{mainNotifierStore, RedisPubSubChannel, gw} if gwConfig.Monitor.EnableTriggerMonitors { @@ -456,7 +470,10 @@ func (gw *Gateway) setupGlobals() { } storeCert := &storage.RedisCluster{KeyPrefix: "cert-", HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler} + storeCert.Connect() + gw.CertificateManager = certs.NewCertificateManager(storeCert, certificateSecret, log, !gw.GetConfig().Cloud) + if gw.GetConfig().SlaveOptions.UseRPC { rpcStore := &RPCStorageHandler{ KeyPrefix: "cert-", @@ -774,10 +791,14 @@ func (gw *Gateway) addOAuthHandlers(spec *APISpec, muxer *mux.Router) *OAuthMana prefix := generateOAuthPrefix(spec.APIID) storageManager := gw.getGlobalMDCBStorageHandler(prefix, false) storageManager.Connect() + + storageDriver := &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler} + storageDriver.Connect() + osinStorage := &RedisOsinStorageInterface{ storageManager, gw.GlobalSessionManager, - &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler}, + storageDriver, spec.OrgID, gw, } @@ -1105,7 +1126,6 @@ func (gw *Gateway) reloadQueueLoop(cb ...func()) { for { select { case <-gw.ctx.Done(): - log.Warn("Canceled ctx in reloadQueueLoop") return case fn := <-gw.reloadQueue: gw.requeueLock.Lock() @@ -1275,9 +1295,6 @@ func (gw *Gateway) initSystem() error { return err } - if gwConfig.PIDFileLocation == "" { - gwConfig.PIDFileLocation = "/var/run/tyk/tyk-gateway.pid" - } gw.SetConfig(gwConfig) gw.afterConfSetup() } @@ -1289,11 +1306,26 @@ func (gw *Gateway) initSystem() error { mainLog.Fatal("Redis connection details not set, please ensure that the storage type is set to Redis and that the connection parameters are correct.") } + go gw.StorageConnectionHandler.Connect(gw.ctx, func() { + gw.reloadURLStructure(func() {}) + }, &gwConfig) + + timeout, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + connected := gw.StorageConnectionHandler.WaitConnect(timeout) + if !connected { + mainLog.Error("storage: timeout connecting") + } else { + mainLog.Info("storage: connected to " + gwConfig.Storage.Type) + } + // suply rpc client globals to join it main loging and instrumentation sub systems rpc.Log = log rpc.Instrument = instrument gw.setupGlobals() + gwConfig = gw.GetConfig() if *cli.Port != "" { portNum, err := strconv.Atoi(*cli.Port) @@ -1310,7 +1342,7 @@ func (gw *Gateway) initSystem() error { mainLog.Info("PIDFile location set to: ", gwConfig.PIDFileLocation) if err := writePIDFile(gw.GetConfig().PIDFileLocation); err != nil { - mainLog.Error("Failed to write PIDFile: ", err) + mainLog.Warn("Failed to write PIDFile: ", err) } if gw.GetConfig().UseDBAppConfigs && gw.GetConfig().Policies.PolicySource != config.DefaultDashPolicySource { @@ -1351,7 +1383,7 @@ func (gw *Gateway) initSystem() error { gw.SetConfig(gwConfig) config.Global = gw.GetConfig - gw.getHostDetails(gw.GetConfig().PIDFileLocation) + gw.getHostDetails() gw.initRPCCache() gw.setupInstrumentation() @@ -1387,22 +1419,18 @@ func (gw *Gateway) SignatureVerifier() (goverify.Verifier, error) { return verifier, nil } +func getPID() string { + return strconv.Itoa(os.Getpid()) +} + func writePIDFile(file string) error { if err := os.MkdirAll(filepath.Dir(file), 0755); err != nil { return err } - pid := strconv.Itoa(os.Getpid()) + pid := getPID() return ioutil.WriteFile(file, []byte(pid), 0600) } -var readPIDFromFile = func(file string) (int, error) { - b, err := ioutil.ReadFile(file) - if err != nil { - return 0, err - } - return strconv.Atoi(string(b)) -} - // afterConfSetup takes care of non-sensical config values (such as zero // timeouts) and sets up a few globals that depend on the config. func (gw *Gateway) afterConfSetup() { @@ -1580,11 +1608,9 @@ func (gw *Gateway) setUpConsul() error { var getIpAddress = netutil.GetIpAddress -func (gw *Gateway) getHostDetails(file string) { +func (gw *Gateway) getHostDetails() { var err error - if gw.hostDetails.PID, err = readPIDFromFile(file); err != nil { - mainLog.Error("Failed ot get host pid: ", err) - } + gw.hostDetails.PID = os.Getpid() if gw.hostDetails.Hostname, err = os.Hostname(); err != nil { mainLog.Error("Failed to get hostname: ", err) } @@ -1603,6 +1629,8 @@ func (gw *Gateway) getHostDetails(file string) { func (gw *Gateway) getGlobalMDCBStorageHandler(keyPrefix string, hashKeys bool) storage.Handler { localStorage := &storage.RedisCluster{KeyPrefix: keyPrefix, HashKeys: hashKeys, ConnectionHandler: gw.StorageConnectionHandler} + localStorage.Connect() + logger := logrus.New().WithFields(logrus.Fields{"prefix": "mdcb-storage-handler"}) if gw.GetConfig().SlaveOptions.UseRPC { @@ -1628,7 +1656,9 @@ func (gw *Gateway) getGlobalStorageHandler(keyPrefix string, hashKeys bool) stor Gw: gw, } } - return &storage.RedisCluster{KeyPrefix: keyPrefix, HashKeys: hashKeys, ConnectionHandler: gw.StorageConnectionHandler} + handler := &storage.RedisCluster{KeyPrefix: keyPrefix, HashKeys: hashKeys, ConnectionHandler: gw.StorageConnectionHandler} + handler.Connect() + return handler } func Start() { @@ -1653,10 +1683,6 @@ func Start() { gwConfig = *defaultConfig } - if gwConfig.PIDFileLocation == "" { - gwConfig.PIDFileLocation = "/var/run/tyk/tyk-gateway.pid" - } - gw := NewGateway(gwConfig, ctx) if err := gw.initSystem(); err != nil { @@ -1664,7 +1690,7 @@ func Start() { } gwConfig = gw.GetConfig() - if gwConfig.ControlAPIPort == 0 { + if !gw.isRunningTests() && gwConfig.ControlAPIPort == 0 { mainLog.Warn("The control_api_port should be changed for production") } @@ -1712,10 +1738,6 @@ func Start() { gw.GetConfig().DBAppConfOptions.Tags) gw.start() - configs := gw.GetConfig() - go gw.StorageConnectionHandler.Connect(gw.ctx, func() { - gw.reloadURLStructure(func() {}) - }, &configs) unix := time.Now().Unix() diff --git a/gateway/server_test.go b/gateway/server_test.go index 8bac1296249..c51c6174f9d 100644 --- a/gateway/server_test.go +++ b/gateway/server_test.go @@ -252,7 +252,6 @@ func gatewayGetHostDetailsTestAddress() gatewayGetHostDetailsTestCheckFn { func defineGatewayGetHostDetailsTests() []struct { name string before func(*Gateway) - readPIDFromFile func(string) (int, error) netutilGetIpAddress func() ([]string, error) checks []gatewayGetHostDetailsTestCheckFn } { @@ -261,13 +260,11 @@ func defineGatewayGetHostDetailsTests() []struct { return []struct { name string before func(*Gateway) - readPIDFromFile func(string) (int, error) netutilGetIpAddress func() ([]string, error) checks []gatewayGetHostDetailsTestCheckFn }{ { - name: "fail-read-pid", - readPIDFromFile: func(_ string) (int, error) { return 0, fmt.Errorf("Error opening file") }, + name: "fail-read-pid", before: func(gw *Gateway) { gw.SetConfig(config.Config{ ListenAddress: "127.0.0.1", @@ -278,8 +275,7 @@ func defineGatewayGetHostDetailsTests() []struct { ), }, { - name: "success-listen-address-set", - readPIDFromFile: func(string) (int, error) { return 1000, nil }, + name: "success-listen-address-set", before: func(gw *Gateway) { gw.SetConfig(config.Config{ ListenAddress: "127.0.0.1", @@ -291,8 +287,7 @@ func defineGatewayGetHostDetailsTests() []struct { ), }, { - name: "success-listen-address-not-set", - readPIDFromFile: func(_ string) (int, error) { return 1000, nil }, + name: "success-listen-address-not-set", before: func(gw *Gateway) { gw.SetConfig(config.Config{ ListenAddress: "", @@ -304,8 +299,7 @@ func defineGatewayGetHostDetailsTests() []struct { ), }, { - name: "fail-getting-network-address", - readPIDFromFile: func(_ string) (int, error) { return 1000, nil }, + name: "fail-getting-network-address", before: func(gw *Gateway) { gw.SetConfig(config.Config{ ListenAddress: "", @@ -325,17 +319,15 @@ func TestGatewayGetHostDetails(t *testing.T) { t.Skip() var ( - orig_readPIDFromFile = readPIDFromFile - orig_mainLog = mainLog - orig_getIpAddress = netutil.GetIpAddress - bl = test.NewBufferingLogger() + orig_mainLog = mainLog + orig_getIpAddress = netutil.GetIpAddress + bl = test.NewBufferingLogger() ) tests := defineGatewayGetHostDetailsTests() // restore the original functions defer func() { - readPIDFromFile = orig_readPIDFromFile mainLog = orig_mainLog getIpAddress = orig_getIpAddress }() @@ -346,9 +338,6 @@ func TestGatewayGetHostDetails(t *testing.T) { bl.ClearLogs() // replace functions with mocks mainLog = bl.Logger.WithField("prefix", "test") - if tt.readPIDFromFile != nil { - readPIDFromFile = tt.readPIDFromFile - } if tt.netutilGetIpAddress != nil { getIpAddress = tt.netutilGetIpAddress @@ -360,7 +349,7 @@ func TestGatewayGetHostDetails(t *testing.T) { tt.before(gw) } - gw.getHostDetails(gw.GetConfig().PIDFileLocation) + gw.getHostDetails() for _, c := range tt.checks { c(t, bl, gw) } diff --git a/gateway/testutil.go b/gateway/testutil.go index 106e9fa3266..a55e60e7582 100644 --- a/gateway/testutil.go +++ b/gateway/testutil.go @@ -1288,9 +1288,10 @@ func (s *Test) Close() { } } + gwConfig := s.Gw.GetConfig() + s.Gw.DefaultProxyMux.swap(&proxyMux{}, s.Gw) if s.config.SeparateControlAPI { - gwConfig := s.Gw.GetConfig() gwConfig.ControlAPIPort = 0 s.Gw.SetConfig(gwConfig) }