Add multiple waiters to file watch so we can monitor when each finishes
separately.
This commit is contained in:
parent
2a96f6d9eb
commit
971be49f5a
6 changed files with 130 additions and 92 deletions
|
@ -39,7 +39,7 @@ _M.Filename = _M.PoolDir .. _M.State[4]
|
|||
_M.StartPowerup = 0
|
||||
|
||||
_M.NeatConfig = {
|
||||
DisableSound = false,
|
||||
DisableSound = true,
|
||||
Threads = 7,
|
||||
--Filename = "DP1.state",
|
||||
SaveFile = _M.Filename .. ".pool",
|
||||
|
|
|
@ -51,6 +51,9 @@ end)
|
|||
pool.run():next(function()
|
||||
print("The pool finished running!!!")
|
||||
end):catch(function(error)
|
||||
if type(error) == "table" then
|
||||
error = "\n"..table.concat(error, "\n")
|
||||
end
|
||||
io.stderr:write(string.format("There was a problem running the pool: %s", error))
|
||||
print(string.format("There was a problem running the pool: %s", error))
|
||||
end)
|
||||
|
|
|
@ -139,7 +139,7 @@ local function waitLoop()
|
|||
waiter = Promise.new()
|
||||
waiter:resolve()
|
||||
else
|
||||
waiter = util.waitForFiles(inputFilePath)
|
||||
waiter = util.waitForFiles(inputFilePath)[1]
|
||||
end
|
||||
|
||||
-- Write the result
|
||||
|
@ -156,7 +156,7 @@ if util.isWin then
|
|||
waiter = Promise.new()
|
||||
waiter:resolve()
|
||||
else
|
||||
waiter = util.waitForFiles(inputFilePath)
|
||||
waiter = util.waitForFiles(inputFilePath)[1]
|
||||
end
|
||||
|
||||
local sec, usec = utime()
|
||||
|
|
|
@ -87,7 +87,7 @@ local function launchChildren(_M, count)
|
|||
APPDATA = settingsDir,
|
||||
}
|
||||
|
||||
local child = util.waitForFiles(outputFileName)
|
||||
local child = util.waitForFiles(outputFileName)[1]
|
||||
|
||||
local cmd = '"'.._M.hostProcess..'" "--rom='..config.ROM..'" --unpause "--lua='..base..'/runner-process.lua"'
|
||||
local poppet = util.popenCmd(cmd, nil, envs)
|
||||
|
@ -170,12 +170,11 @@ return function(promise)
|
|||
table.insert(outputFileNames, outputPrefix..i)
|
||||
end
|
||||
|
||||
local waiter = util.waitForFiles(outputFileNames, nil, tmpFileName.."_output_*")
|
||||
local waiters = util.waitForFiles(outputFileNames)
|
||||
|
||||
message(_M, 'Setting up child processes')
|
||||
|
||||
for i=1,#speciesSlice,1 do
|
||||
|
||||
local inputFileName = tmpFileName.."_input_"..i
|
||||
local inputFile = io.open(inputFileName, 'w')
|
||||
inputFile:write(serpent.dump({speciesSlice[i], generationIdx}))
|
||||
|
@ -184,54 +183,51 @@ return function(promise)
|
|||
|
||||
message(_M, 'Waiting for child processes to finish')
|
||||
|
||||
return waiter
|
||||
end):next(function()
|
||||
message(_M, 'Child processes finished')
|
||||
|
||||
local finished = 0
|
||||
for i=1,#speciesSlice,1 do
|
||||
message(_M, "Processing output "..i)
|
||||
local outputFileName = tmpFileName..'_output_'..i
|
||||
local outputFile = io.open(outputFileName, "r")
|
||||
local line = ""
|
||||
repeat
|
||||
local ok, obj = serpent.load(line)
|
||||
if not ok then
|
||||
goto continue
|
||||
end
|
||||
|
||||
if obj == nil then
|
||||
goto continue
|
||||
end
|
||||
|
||||
if obj.type == 'onMessage' then
|
||||
message(_M, obj.msg, obj.color)
|
||||
elseif obj.type == 'onLoad' then
|
||||
load(_M, obj.filename)
|
||||
elseif obj.type == 'onSave' then
|
||||
save(_M, obj.filename)
|
||||
elseif obj.type == 'onGenome' then
|
||||
for i=1,#speciesSlice,1 do
|
||||
local s = speciesSlice[i]
|
||||
if s.id == obj.speciesId then
|
||||
s.genomes[obj.genomeIndex] = obj.genome
|
||||
break
|
||||
end
|
||||
for i=1,#waiters,1 do
|
||||
waiters[i] = waiters[i]:next(function(outputFileName)
|
||||
message(_M, "Processing output "..i)
|
||||
local outputFile = io.open(outputFileName, "r")
|
||||
local line = ""
|
||||
repeat
|
||||
local ok, obj = serpent.load(line)
|
||||
if not ok then
|
||||
goto continue
|
||||
end
|
||||
genomeCallback(obj.genome, obj.index)
|
||||
elseif obj.type == 'onFinish' then
|
||||
finished = finished + 1
|
||||
if finished == #speciesSlice then
|
||||
outputFile:close()
|
||||
|
||||
if obj == nil then
|
||||
goto continue
|
||||
end
|
||||
|
||||
if obj.type == 'onMessage' then
|
||||
message(_M, obj.msg, obj.color)
|
||||
elseif obj.type == 'onLoad' then
|
||||
load(_M, obj.filename)
|
||||
elseif obj.type == 'onSave' then
|
||||
save(_M, obj.filename)
|
||||
elseif obj.type == 'onGenome' then
|
||||
for i=1,#speciesSlice,1 do
|
||||
local s = speciesSlice[i]
|
||||
if s.id == obj.speciesId then
|
||||
s.genomes[obj.genomeIndex] = obj.genome
|
||||
break
|
||||
end
|
||||
end
|
||||
genomeCallback(obj.genome, obj.index)
|
||||
elseif obj.type == 'onFinish' then
|
||||
message(_M, "Finished processing output "..i)
|
||||
return
|
||||
end
|
||||
end
|
||||
|
||||
::continue::
|
||||
line = outputFile:read()
|
||||
until(line == "" or line == nil)
|
||||
::continue::
|
||||
line = outputFile:read()
|
||||
until(line == "" or line == nil)
|
||||
error("Child process "..i.." never finished")
|
||||
end)
|
||||
end
|
||||
error(string.format("Some processes never finished? Saw %d terminations.", finished))
|
||||
|
||||
return Promise.all(table.unpack(waiters))
|
||||
end):next(function()
|
||||
message(_M, 'Child processes finished')
|
||||
end)
|
||||
end
|
||||
|
||||
|
|
104
util.lua
104
util.lua
|
@ -104,22 +104,11 @@ function _M.closeCmd(handle)
|
|||
end
|
||||
end
|
||||
|
||||
function _M.waitForFiles(filenames, count, wild)
|
||||
local promise = Promise.new()
|
||||
promise:resolve()
|
||||
|
||||
function _M.waitForFiles(filenames)
|
||||
if type(filenames) == 'string' then
|
||||
if wild == nil then
|
||||
wild = filenames
|
||||
end
|
||||
|
||||
filenames = {filenames}
|
||||
end
|
||||
|
||||
if count == nil then
|
||||
count = #filenames
|
||||
end
|
||||
|
||||
local poppet = nil
|
||||
if _M.isWin then
|
||||
local sec, usec = utime()
|
||||
|
@ -129,46 +118,75 @@ function _M.waitForFiles(filenames, count, wild)
|
|||
poppet = _M.popenCmd(cmd, base)
|
||||
|
||||
poppet:read("*l")
|
||||
else
|
||||
local watchCmd = ''
|
||||
if count == 1 then
|
||||
watchCmd = [[which inotifywait >/dev/null && { inotifywait -q -e close_write ']]..filenames[1]..[[' || exit 0 ; }]]
|
||||
else
|
||||
watchCmd = [[bash <<'EOF'
|
||||
COUNT=]]..count..[[
|
||||
FILENAMES=(']]..table.concat(filenames, "' '")..[[')
|
||||
declare -A SEEN
|
||||
((I = 0))
|
||||
set -m
|
||||
which inotifywait >/dev/null
|
||||
( inotifywait -q -m -e close_write "${FILENAMES[@]}" | while read LINE ; do
|
||||
SEEN["$LINE"]=1
|
||||
TOTAL=${#SEEN[@]}
|
||||
if ((TOTAL == COUNT)) ; then
|
||||
kill -s TERM 0
|
||||
fi
|
||||
done ) &
|
||||
wait
|
||||
EOF]]
|
||||
end
|
||||
poppet = _M.popenCmd(watchCmd)
|
||||
end
|
||||
|
||||
return promise:next(function()
|
||||
if _M.isWin then
|
||||
local waiters = {}
|
||||
for i=1,#filenames,1 do
|
||||
local waiter = Promise.new()
|
||||
table.insert(waiters, waiter)
|
||||
end
|
||||
|
||||
-- To defer the check of the files
|
||||
local promise = Promise.new()
|
||||
promise:resolve()
|
||||
promise:next(function()
|
||||
local i = 1
|
||||
while i <= count do
|
||||
while i <= filenames do
|
||||
local line = poppet:read("*l")
|
||||
for chr in line:gmatch(";") do
|
||||
i = i + 1
|
||||
end
|
||||
i = i + 1
|
||||
end
|
||||
else
|
||||
poppet:read("*a")
|
||||
_M.closeCmd(poppet)
|
||||
-- FIXME synchronous
|
||||
for i=1,#filenames,1 do
|
||||
waiters[i]:resolve(filenames[i])
|
||||
end
|
||||
end):catch(function(reason)
|
||||
for i=1,#filenames,1 do
|
||||
waiters[i]:reject(reason)
|
||||
end
|
||||
end)
|
||||
|
||||
return waiters
|
||||
else
|
||||
local watchCmd = [[bash ]]..base..[[/watch.sh ']]..table.concat(filenames, [[' ']])..[[']]
|
||||
poppet = _M.popenCmd(watchCmd)
|
||||
|
||||
local waiters = {}
|
||||
for i=1,#filenames,1 do
|
||||
local waiter = Promise.new()
|
||||
table.insert(waiters, waiter)
|
||||
end
|
||||
end)
|
||||
|
||||
local finished = 0
|
||||
local function waitLoop()
|
||||
local promise = Promise.new()
|
||||
promise:resolve()
|
||||
return promise:next(function()
|
||||
local line = poppet:read("*l")
|
||||
finished = finished + 1
|
||||
local filename = line:gsub('%s+[^%s]+$', "")
|
||||
for i=1,#filenames,1 do
|
||||
if filename == filenames[i] then
|
||||
waiters[i]:resolve(filenames[i])
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
if finished ~= #filenames then
|
||||
return waitLoop()
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
waitLoop():catch(function(reason)
|
||||
for i=1,#waiters,1 do
|
||||
waiters:reject(reason)
|
||||
end
|
||||
end)
|
||||
|
||||
return waiters
|
||||
end
|
||||
end
|
||||
|
||||
function _M.table_to_string(tbl)
|
||||
|
|
21
watch.sh
Normal file
21
watch.sh
Normal file
|
@ -0,0 +1,21 @@
|
|||
#! /bin/bash
|
||||
FILENAMES=("$@")
|
||||
declare -A SEEN
|
||||
((I = 0))
|
||||
set -m
|
||||
which inotifywait >/dev/null
|
||||
function checker {
|
||||
inotifywait -q -m -e close_write "${FILENAMES[@]}" | while read LINE ; do
|
||||
if ! [ ${SEEN["$LINE"]+y} ] ; then
|
||||
SEEN["$LINE"]=1
|
||||
echo "$LINE"
|
||||
fi ;
|
||||
TOTAL=${#SEEN[@]}
|
||||
COUNT=$#
|
||||
if ((TOTAL == COUNT)) ; then
|
||||
kill -s TERM 0
|
||||
fi
|
||||
done
|
||||
}
|
||||
checker &
|
||||
wait
|
Loading…
Add table
Reference in a new issue