92 lines
3.7 KiB
Lua
92 lines
3.7 KiB
Lua
-- Inter Worker Communication Protocol, designed for nginx for Windows but works for any OS
|
|
-- 3-3-2015 v0.3 beta http://nginx-win.ecsds.eu/
|
|
-- 0.1: initial design
|
|
-- 0.2: public release
|
|
-- 0.3: added lock, prevent re-entry while processing message
|
|
|
|
-- set only once, global vars, 8 workers * 5 seconds = +-40 second for all workers to handle a message
|
|
if not iwcp_delay then
|
|
-- worker wake up every 5 seconds
|
|
iwcp_delay = 5
|
|
-- expire messages after 60 seconds
|
|
iwcp_expire = 60
|
|
iwcp_debug = nil -- or 1 to enable
|
|
iwcp_show = 1
|
|
math.randomseed(os.time())
|
|
local unused = math.random(1000,9999)
|
|
end
|
|
-- local to worker
|
|
local handler
|
|
local worker_pid = tostring(ngx.worker.pid())
|
|
local upstream = require "ngx.upstream"
|
|
local log = ngx.log
|
|
local ERR = ngx.ERR
|
|
local NOTICE = ngx.NOTICE
|
|
local INFO = ngx.INFO
|
|
local WARN = ngx.WARN
|
|
local DEBUG = ngx.DEBUG
|
|
local Query = ngx.shared.iworkcomproto
|
|
local QResult, wmsg, keylock
|
|
local tvar1, keys, key
|
|
local from, to, err, i
|
|
local iwcp_one, iwcp_two, iwcp_tree
|
|
|
|
handler = function (premature)
|
|
if iwcp_debug then ngx.log(ngx.ERR, "timer activation for worker: "..worker_pid) end
|
|
|
|
keys = Query:get_keys(0)
|
|
for _,key in pairs(keys) do
|
|
-- peer message handling begin
|
|
from, to, err = ngx.re.find(key, "IWCP_MSG_P", "i")
|
|
if from then
|
|
QResult = Query:get(key)
|
|
tvar1 = "#"..worker_pid.."#"
|
|
from, to, err = ngx.re.find(QResult, tvar1, "i") -- is this worker PID in here?
|
|
-- if not true we have a message to deal with, add PID to msg when finished processing
|
|
if not from then -- and not (keylock == key) then
|
|
-- keylock = key -- if processing takes a while make sure we don't process same msg(key) before finishing
|
|
-- do we need to prevent re-entry or do we need to advice to pass long running jobs to a co-socket?
|
|
if iwcp_show then ngx.log(ngx.ERR, "message for worker: "..worker_pid..", key: "..key..", msg: "..QResult) end
|
|
from, to, err = ngx.re.find(QResult, "!", "i")
|
|
i = string.sub(QResult, from+1, string.len(QResult)) -- get message parameters
|
|
iwcp_one, iwcp_two, iwcp_tree = i:match("([^,]+),([^,]+),([^,]+)") -- split up parameters
|
|
-- all commands and values are in vars, run commands in this worker
|
|
from, to, err = ngx.re.find(key, "IWCP_MSG_PD_", "i") -- peer down
|
|
if from then
|
|
ok, err = upstream.set_peer_down(iwcp_one, false, tonumber(iwcp_two), true)
|
|
end
|
|
from, to, err = ngx.re.find(key, "IWCP_MSG_PU_", "i") -- peer up
|
|
if from then
|
|
ok, err = upstream.set_peer_down(iwcp_one, false, tonumber(iwcp_two), false)
|
|
end
|
|
from, to, err = ngx.re.find(key, "IWCP_MSG_PI_", "i") -- peer change addr(ip:port)
|
|
if from then
|
|
ok, err = upstream.set_peer_addr(iwcp_one, false, tonumber(iwcp_two), iwcp_tree)
|
|
end
|
|
-- log results
|
|
if iwcp_show then ngx.log(ngx.ERR, "result: "..tostring(ok)..", for worker: "..worker_pid..", err: "..tostring(err)..", key: "..key) end
|
|
-- update key
|
|
QResult = "#"..worker_pid..QResult
|
|
Query:replace(key,QResult,iwcp_expire)
|
|
-- keylock = nil
|
|
end
|
|
end
|
|
-- peer message handling end
|
|
end
|
|
|
|
if premature then
|
|
return
|
|
end
|
|
ok, err = ngx.timer.at(iwcp_delay, handler)
|
|
if not ok then
|
|
if iwcp_debug then ngx.log(ngx.ERR, "failed to create the timer: ", err) end
|
|
return
|
|
end
|
|
end
|
|
|
|
ok, err = ngx.timer.at(iwcp_delay, handler)
|
|
if not ok then
|
|
if iwcp_debug then ngx.log(ngx.ERR, "failed to create the timer: ", err) end
|
|
return
|
|
end
|