Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apisix/discovery/kubernetes/informer_factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ local function list_watch(informer, apiserver)

core.log.info("begin to list ", informer.kind)
informer.fetch_state = "listing"
if informer.pre_List then
if informer.pre_list then
informer:pre_list()
end

Expand All @@ -298,7 +298,7 @@ local function list_watch(informer, apiserver)
end

informer.fetch_state = "list finished"
if informer.post_List then
if informer.post_list then
informer:post_list()
end

Expand Down
35 changes: 28 additions & 7 deletions apisix/discovery/kubernetes/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ local function sort_nodes_cmp(left, right)
return left.port < right.port
end

local function on_endpoint_slices_modified(handle, endpoint)
local function on_endpoint_slices_modified(handle, endpoint, operate)
if handle.namespace_selector and
not handle:namespace_selector(endpoint.metadata.namespace) then
return
Expand Down Expand Up @@ -114,10 +114,15 @@ local function on_endpoint_slices_modified(handle, endpoint)
if err then
core.log.error("set endpoint into discovery DICT failed, ", err)
handle.endpoint_dict:delete(endpoint_key .. "#version")
return
end
if operate == "list" then
handle.current_keys_hash[endpoint_key] = true
handle.current_keys_hash[endpoint_key .. "#version"] = true
end
end

local function on_endpoint_modified(handle, endpoint)
local function on_endpoint_modified(handle, endpoint, operate)
if handle.namespace_selector and
not handle:namespace_selector(endpoint.metadata.namespace) then
return
Expand Down Expand Up @@ -177,6 +182,11 @@ local function on_endpoint_modified(handle, endpoint)
if err then
core.log.error("set endpoint into discovery DICT failed, ", err)
handle.endpoint_dict:delete(endpoint_key .. "#version")
return
end
if operate == "list" then
handle.current_keys_hash[endpoint_key] = true
handle.current_keys_hash[endpoint_key .. "#version"] = true
end
end

Expand All @@ -195,12 +205,23 @@ end


local function pre_list(handle)
handle.endpoint_dict:flush_all()
handle.current_keys_hash = {}
handle.existing_keys = handle.endpoint_dict:get_keys(0)
end


local function post_list(handle)
handle.endpoint_dict:flush_expired()
if not handle.existing_keys or not handle.current_keys_hash then
return
end
for _, key in ipairs(handle.existing_keys) do
if not handle.current_keys_hash[key] then
core.log.info("kubernetes discovery module find dirty data in shared dict, key:", key)
handle.endpoint_dict:delete(key)
end
end
handle.existing_keys = nil
handle.current_keys_hash = nil
end


Expand Down Expand Up @@ -369,7 +390,7 @@ local function get_apiserver(conf)
end

local function create_endpoint_lrucache(endpoint_dict, endpoint_key, endpoint_port)
local endpoint_content = endpoint_dict:get_stale(endpoint_key)
local endpoint_content = endpoint_dict:get(endpoint_key)
if not endpoint_content then
core.log.error("get empty endpoint content from discovery DIC, this should not happen ",
endpoint_key)
Expand Down Expand Up @@ -497,7 +518,7 @@ local function single_mode_nodes(service_name)
local endpoint_dict = ctx
local endpoint_key = match[1]
local endpoint_port = match[2]
local endpoint_version = endpoint_dict:get_stale(endpoint_key .. "#version")
local endpoint_version = endpoint_dict:get(endpoint_key .. "#version")
if not endpoint_version then
core.log.info("get empty endpoint version from discovery DICT ", endpoint_key)
return nil
Expand Down Expand Up @@ -612,7 +633,7 @@ local function multiple_mode_nodes(service_name)

local endpoint_key = match[2]
local endpoint_port = match[3]
local endpoint_version = endpoint_dict:get_stale(endpoint_key .. "#version")
local endpoint_version = endpoint_dict:get(endpoint_key .. "#version")
if not endpoint_version then
core.log.info("get empty endpoint version from discovery DICT ", endpoint_key)
return nil
Expand Down
137 changes: 137 additions & 0 deletions t/kubernetes/discovery/kubernetes3.t
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ _EOC_
}
}

location /t {
content_by_lua_block {
ngx.sleep(2)
ngx.exit(200)
}
}

_EOC_

$block->set_value("config", $config);
Expand Down Expand Up @@ -493,3 +500,133 @@ GET /dump
GET /dump
--- response_body_like
.*"name":"default/kubernetes".*



=== TEST 7: test pre_list and post_list work for single-k8s with endpoint_slices
--- log_level: info
--- yaml_config eval: $::single_yaml_config
--- extra_init_by_lua
local ngx = ngx
local core = require("apisix.core")

local dict = ngx.shared["kubernetes"]
local ok,err = dict:set("dirty_key", true)
if not ok then
core.log.error("set dirty_key to dict fail, err: ", err)
end
--- request
GET /t
--- no_error_log
[error]
--- grep_error_log eval
qr/kubernetes discovery module find dirty data in shared dict/
--- grep_error_log_out
kubernetes discovery module find dirty data in shared dict



=== TEST 8: test pre_list and post_list work for multi-k8s with endpoint_slices
--- log_level: info
--- yaml_config eval: $::yaml_config
--- extra_init_by_lua
local ngx = ngx
local core = require("apisix.core")

local dict = ngx.shared["kubernetes-first"]
local ok,err = dict:set("dirty_key", true)
if not ok then
core.log.error("set dirty_key to dict fail, err: ", err)
end
--- request
GET /t
--- no_error_log
[error]
--- grep_error_log eval
qr/kubernetes discovery module find dirty data in shared dict/
--- grep_error_log_out
kubernetes discovery module find dirty data in shared dict



=== TEST 9: test pre_list and post_list work for single-k8s with endpoints
--- log_level: info
--- yaml_config
apisix:
node_listen: 1984
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
kubernetes:
service:
host: "127.0.0.1"
port: "6443"
client:
token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
watch_endpoint_slices: false
--- extra_init_by_lua
local ngx = ngx
local core = require("apisix.core")

local dict = ngx.shared["kubernetes"]
local ok,err = dict:set("dirty_key", true)
if not ok then
core.log.error("set dirty_key to dict fail, err: ", err)
end
--- request
GET /t
--- no_error_log
[error]
--- grep_error_log eval
qr/kubernetes discovery module find dirty data in shared dict/
--- grep_error_log_out
kubernetes discovery module find dirty data in shared dict



=== TEST 10: test pre_list and post_list work for multi-k8s with endpoints
--- log_level: info
--- yaml_config
apisix:
node_listen: 1984
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
kubernetes:
- id: first
service:
host: "127.0.0.1"
port: "6443"
client:
token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
watch_endpoint_slices: false
- id: second
service:
schema: "http"
host: "127.0.0.1"
port: "6445"
client:
token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
watch_endpoint_slices: false
--- extra_init_by_lua
local ngx = ngx
local core = require("apisix.core")

local dict = ngx.shared["kubernetes-first"]
local ok,err = dict:set("dirty_key", true)
if not ok then
core.log.error("set dirty_key to dict fail, err: ", err)
end
--- request
GET /t
--- no_error_log
[error]
--- grep_error_log eval
qr/kubernetes discovery module find dirty data in shared dict/
--- grep_error_log_out
kubernetes discovery module find dirty data in shared dict

Loading