Skip to content

Commit

Permalink
Fix redis connections and remote pidfilelocation usage internal
Browse files Browse the repository at this point in the history
  • Loading branch information
Tit Petric committed Jan 8, 2025
1 parent f4e4130 commit a7ca7df
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 27 deletions.
11 changes: 9 additions & 2 deletions gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2236,13 +2236,16 @@ func (gw *Gateway) createOauthClient(w http.ResponseWriter, r *http.Request) {
storageManager := gw.getGlobalMDCBStorageHandler(prefix, false)
storageManager.Connect()

storageDriver := &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler}
storageDriver.Connect()

apiSpec.OAuthManager = &OAuthManager{
OsinServer: gw.TykOsinNewServer(
&osin.ServerConfig{},
&RedisOsinStorageInterface{
storageManager,
gw.GlobalSessionManager,
&storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler},
storageDriver,
apiSpec.OrgID,
gw,
}),
Expand Down Expand Up @@ -2623,12 +2626,16 @@ func (gw *Gateway) getOauthClientDetails(keyName, apiID string) (interface{}, in
prefix := generateOAuthPrefix(apiSpec.APIID)
storageManager := gw.getGlobalMDCBStorageHandler(prefix, false)
storageManager.Connect()

storageDriver := &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler}
storageDriver.Connect()

apiSpec.OAuthManager = &OAuthManager{
OsinServer: gw.TykOsinNewServer(&osin.ServerConfig{},
&RedisOsinStorageInterface{
storageManager,
gw.GlobalSessionManager,
&storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler},
storageDriver,
apiSpec.OrgID,
gw,
}),
Expand Down
2 changes: 2 additions & 0 deletions gateway/delete_api_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ import (

func (gw *Gateway) invalidateAPICache(apiID string) bool {
store := storage.RedisCluster{IsCache: true, ConnectionHandler: gw.StorageConnectionHandler}
store.Connect()

return store.DeleteScanMatch(fmt.Sprintf("cache-%s*", apiID))
}
26 changes: 26 additions & 0 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,10 @@ func TestCacheAllSafeRequests(t *testing.T) {
t.Skip() // DeleteScanMatch interferes with other tests.

ts := StartTest(nil)

cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler}
cache.Connect()

t.Cleanup(func() {
ts.Close()
cache.DeleteScanMatch("*")
Expand Down Expand Up @@ -975,7 +978,10 @@ func TestCacheAllSafeRequestsWithCachedHeaders(t *testing.T) {
t.Skip() // DeleteScanMatch interferes with other tests.

ts := StartTest(nil)

cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler}
cache.Connect()

t.Cleanup(func() {
ts.Close()
cache.DeleteScanMatch("*")
Expand Down Expand Up @@ -1021,7 +1027,10 @@ func TestCacheWithAdvanceUrlRewrite(t *testing.T) {
t.Skip() // DeleteScanMatch interferes with other tests.

ts := StartTest(nil)

cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler}
cache.Connect()

t.Cleanup(func() {
ts.Close()
cache.DeleteScanMatch("*")
Expand Down Expand Up @@ -1080,7 +1089,10 @@ func TestCachePostRequest(t *testing.T) {
t.Skip() // DeleteScanMatch interferes with other tests.

ts := StartTest(nil)

cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler}
cache.Connect()

t.Cleanup(func() {
ts.Close()
cache.DeleteScanMatch("*")
Expand Down Expand Up @@ -1127,7 +1139,10 @@ func TestAdvanceCachePutRequest(t *testing.T) {
t.Skip() // DeleteScanMatch interferes with other tests.

ts := StartTest(nil)

cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler}
cache.Connect()

t.Cleanup(func() {
ts.Close()
cache.DeleteScanMatch("*")
Expand Down Expand Up @@ -1220,7 +1235,10 @@ func TestCacheAllSafeRequestsWithAdvancedCacheEndpoint(t *testing.T) {
t.Skip() // DeleteScanMatch interferes with other tests.

ts := StartTest(nil)

cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler}
cache.Connect()

t.Cleanup(func() {
ts.Close()
cache.DeleteScanMatch("*")
Expand Down Expand Up @@ -1259,7 +1277,9 @@ func TestCacheEtag(t *testing.T) {
t.Skip() // DeleteScanMatch interferes with other tests.

ts := StartTest(nil)

cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler}
cache.Connect()

t.Cleanup(func() {
ts.Close()
Expand Down Expand Up @@ -1314,7 +1334,10 @@ func TestOldCachePlugin(t *testing.T) {
t.Helper()

ts := StartTest(nil)

cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler}
cache.Connect()

t.Cleanup(func() {
ts.Close()
cache.DeleteScanMatch("*")
Expand Down Expand Up @@ -1343,7 +1366,10 @@ func TestAdvanceCacheTimeoutPerEndpoint(t *testing.T) {
t.Skip() // DeleteScanMatch interferes with other tests.

ts := StartTest(nil)

cache := storage.RedisCluster{KeyPrefix: "cache-", ConnectionHandler: ts.Gw.StorageConnectionHandler}
cache.Connect()

t.Cleanup(func() {
ts.Close()
cache.DeleteScanMatch("*")
Expand Down
1 change: 1 addition & 0 deletions gateway/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (gw *Gateway) gatherHealthChecks() {
allInfos := SafeHealthCheck{info: make(map[string]HealthCheckItem, 3)}

redisStore := storage.RedisCluster{KeyPrefix: "livenesscheck-", ConnectionHandler: gw.StorageConnectionHandler}
redisStore.Connect()

key := "tyk-liveness-probe"

Expand Down
17 changes: 17 additions & 0 deletions gateway/host_checker_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ func TestHostCheckerManagerInit(t *testing.T) {
defer ts.Close()

hc := HostCheckerManager{Gw: ts.Gw}

redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-test:", ConnectionHandler: ts.Gw.StorageConnectionHandler}
redisStorage.Connect()

hc.Init(redisStorage)

if hc.Id == "" {
Expand Down Expand Up @@ -46,6 +49,8 @@ func TestAmIPolling(t *testing.T) {
ts.Gw.SetConfig(globalConf)

redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-test:", ConnectionHandler: ts.Gw.StorageConnectionHandler}
redisStorage.Connect()

hc.Init(redisStorage)
hc2 := HostCheckerManager{Gw: ts.Gw}
hc2.Init(redisStorage)
Expand Down Expand Up @@ -74,7 +79,10 @@ func TestAmIPolling(t *testing.T) {

//Testing if the PollerCacheKey doesn't contains the poller_group by default
hc = HostCheckerManager{Gw: ts.Gw}

redisStorage = &storage.RedisCluster{KeyPrefix: "host-checker-test:", ConnectionHandler: ts.Gw.StorageConnectionHandler}
redisStorage.Connect()

hc.Init(redisStorage)
hc.AmIPolling()

Expand Down Expand Up @@ -106,7 +114,10 @@ func TestCheckActivePollerLoop(t *testing.T) {
defer ts.Close()

hc := &HostCheckerManager{Gw: ts.Gw}

redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-test-1:", ConnectionHandler: ts.Gw.StorageConnectionHandler}
redisStorage.Connect()

hc.Init(redisStorage)

go hc.CheckActivePollerLoop(ts.Gw.ctx)
Expand All @@ -122,7 +133,10 @@ func TestStartPoller(t *testing.T) {
defer ts.Close()

hc := HostCheckerManager{Gw: ts.Gw}

redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-TestStartPoller:", ConnectionHandler: ts.Gw.StorageConnectionHandler}
redisStorage.Connect()

hc.Init(redisStorage)

hc.StartPoller(ts.Gw.ctx)
Expand All @@ -135,9 +149,12 @@ func TestStartPoller(t *testing.T) {
func TestRecordUptimeAnalytics(t *testing.T) {
ts := StartTest(nil)
defer ts.Close()

hc := &HostCheckerManager{Gw: ts.Gw}

redisStorage := &storage.RedisCluster{KeyPrefix: "host-checker-test-analytics:", ConnectionHandler: ts.Gw.StorageConnectionHandler}
redisStorage.Connect()

hc.Init(redisStorage)

spec := &APISpec{}
Expand Down
5 changes: 4 additions & 1 deletion gateway/mw_external_oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,10 @@ func isExpired(claims jwt.MapClaims) bool {
}

func newIntrospectionCache(gw *Gateway) *introspectionCache {
return &introspectionCache{RedisCluster: storage.RedisCluster{KeyPrefix: "introspection-", ConnectionHandler: gw.StorageConnectionHandler}}
conn := storage.RedisCluster{KeyPrefix: "introspection-", ConnectionHandler: gw.StorageConnectionHandler}
conn.Connect()

return &introspectionCache{RedisCluster: conn}
}

type introspectionCache struct {
Expand Down
6 changes: 5 additions & 1 deletion gateway/mw_jwt.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,16 @@ func (k *JWTMiddleware) processCentralisedJWT(r *http.Request, token *jwt.Token)
prefix := generateOAuthPrefix(k.Spec.APIID)
storageManager := k.Gw.getGlobalMDCBStorageHandler(prefix, false)
storageManager.Connect()

storageDriver := &storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: k.Gw.StorageConnectionHandler}
storageDriver.Connect()

k.Spec.OAuthManager = &OAuthManager{
OsinServer: k.Gw.TykOsinNewServer(&osin.ServerConfig{},
&RedisOsinStorageInterface{
storageManager,
k.Gw.GlobalSessionManager,
&storage.RedisCluster{KeyPrefix: prefix, HashKeys: false, ConnectionHandler: k.Gw.StorageConnectionHandler},
storageDriver,
k.Spec.OrgID,
k.Gw,
}),
Expand Down
8 changes: 6 additions & 2 deletions gateway/mw_oauth2_auth_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ func getUpstreamOAuthMw(base *BaseMiddleware) TykMiddleware {
}

func getClientCredentialsStorageHandler(base *BaseMiddleware) *storage.RedisCluster {
return &storage.RedisCluster{KeyPrefix: "upstreamOAuthCC-", ConnectionHandler: base.Gw.StorageConnectionHandler}
handler := &storage.RedisCluster{KeyPrefix: "upstreamOAuthCC-", ConnectionHandler: base.Gw.StorageConnectionHandler}
handler.Connect()
return handler
}

func getPasswordStorageHandler(base *BaseMiddleware) *storage.RedisCluster {
return &storage.RedisCluster{KeyPrefix: "upstreamOAuthPW-", ConnectionHandler: base.Gw.StorageConnectionHandler}
handler := &storage.RedisCluster{KeyPrefix: "upstreamOAuthPW-", ConnectionHandler: base.Gw.StorageConnectionHandler}
handler.Connect()
return handler
}
1 change: 1 addition & 0 deletions gateway/oauth_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,7 @@ func (gw *Gateway) purgeLapsedOAuthTokens() error {
}

redisCluster := &storage.RedisCluster{KeyPrefix: "", HashKeys: false, ConnectionHandler: gw.StorageConnectionHandler}
redisCluster.Connect()

ok, err := redisCluster.Lock("oauth-purge-lock", time.Minute)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions gateway/redis_logrus_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ type redisChannelHook struct {
func (gw *Gateway) newRedisHook() *redisChannelHook {
hook := &redisChannelHook{}
hook.formatter = new(logrus.JSONFormatter)
hook.notifier.store = &storage.RedisCluster{KeyPrefix: "gateway-notifications:", ConnectionHandler: gw.StorageConnectionHandler}
hook.notifier.channel = "dashboard.ui.messages"
hook.notifier.store = &storage.RedisCluster{KeyPrefix: "gateway-notifications:", ConnectionHandler: gw.StorageConnectionHandler}
hook.notifier.store.Connect()
return hook
}

func (hook *redisChannelHook) Fire(entry *logrus.Entry) error {

orgId, found := entry.Data["org_id"]
if !found {
return nil
Expand All @@ -44,6 +44,7 @@ func (hook *redisChannelHook) Fire(entry *logrus.Entry) error {
}

go hook.notifier.Notify(n)

return nil
}

Expand Down
9 changes: 5 additions & 4 deletions gateway/rpc_backup_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ func (gw *Gateway) LoadDefinitionsFromRPCBackup() ([]*APISpec, error) {
tagList := getTagListAsString(gw.GetConfig().DBAppConfOptions.Tags)
checkKey := BackupApiKeyBase + tagList

store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler}
store := &storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler}
connected := store.Connect()

log.Info("[RPC] --> Loading API definitions from backup")

if !connected {
Expand Down Expand Up @@ -60,7 +61,7 @@ func (gw *Gateway) saveRPCDefinitionsBackup(list string) error {

log.Info("--> Connecting to DB")

store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler}
store := &storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler}
connected := store.Connect()

log.Info("--> Connected to DB")
Expand All @@ -83,9 +84,9 @@ func (gw *Gateway) LoadPoliciesFromRPCBackup() (map[string]user.Policy, error) {
tagList := getTagListAsString(gw.GetConfig().DBAppConfOptions.Tags)
checkKey := BackupPolicyKeyBase + tagList

store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler}

store := &storage.RedisCluster{KeyPrefix: RPCKeyPrefix, ConnectionHandler: gw.StorageConnectionHandler}
connected := store.Connect()

log.Info("[RPC] Loading Policies from backup")

if !connected {
Expand Down
2 changes: 1 addition & 1 deletion gateway/rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (r *RPCStorageHandler) buildNodeInfo() []byte {
intCheckDuration = int64(checkDuration / time.Second)
}

r.Gw.getHostDetails(r.Gw.GetConfig().PIDFileLocation)
r.Gw.getHostDetails()
node := model.NodeData{
NodeID: r.Gw.GetNodeID(),
GroupID: config.SlaveOptions.GroupID,
Expand Down
Loading

0 comments on commit a7ca7df

Please sign in to comment.