Handle threading in the wrapper

This commit is contained in:
Empathic Qubit 2021-05-13 05:55:09 -04:00
parent 097f1cbfb4
commit ed5e64c6b7
3 changed files with 41 additions and 25 deletions

View file

@ -729,14 +729,7 @@ local function mainLoop(currentSpecies, topGenome)
end end
if hasThreads then if hasThreads then
slice = {} slice = pool.species
for i=currentSpecies, currentSpecies + config.NeatConfig.Threads - 1, 1 do
if pool.species[i] == nil then
break
end
table.insert(slice, pool.species[i])
end
end end
return runner.run( return runner.run(

View file

@ -40,6 +40,14 @@ local function writeResponse(object)
outputPipe:flush() outputPipe:flush()
end end
local function unblockLoop()
return util.delay(1000000):next(function()
outputPipe:write(".\n")
outputPipe:flush()
return unblockLoop()
end)
end
local runner = Runner(Promise) local runner = Runner(Promise)
runner.onMessage(function(msg, color) runner.onMessage(function(msg, color)
statusLine = msg statusLine = msg
@ -147,7 +155,9 @@ writeResponse({ type = 'onInit', ts = ts })
print(string.format('Wrote init to output at %d', ts)) print(string.format('Wrote init to output at %d', ts))
waiter:next(waitLoop):catch(function(error) waiter:next(function(inputLine)
return waitLoop(inputLine)
end):catch(function(error)
if type(error) == "table" then if type(error) == "table" then
error = "\n"..table.concat(error, "\n") error = "\n"..table.concat(error, "\n")
end end

View file

@ -158,22 +158,14 @@ return function(promise)
onReset(_M, handler) onReset(_M, handler)
end end
_M.run = function(speciesSlice, generationIdx, genomeCallback) _M.run = function(species, generationIdx, genomeCallback)
local promise = Promise.new() local promise = Promise.new()
promise:resolve() promise:resolve()
return promise:next(function() return promise:next(function()
return launchChildren(_M, #speciesSlice) return launchChildren(_M, config.NeatConfig.Threads)
end):next(function() end):next(function()
message(_M, 'Setting up child processes') message(_M, 'Setting up child processes')
for i=1,#speciesSlice,1 do
local inputPipe = _M.poppets[i].input
inputPipe:write(serpent.dump({speciesSlice[i], generationIdx}).."\n")
inputPipe:flush()
end
message(_M, 'Waiting for child processes to finish')
local maxFitness = nil local maxFitness = nil
local function readLoop(outputPipe) local function readLoop(outputPipe)
return util.promiseWrap(function() return util.promiseWrap(function()
@ -201,8 +193,8 @@ return function(promise)
elseif obj.type == 'onReset' then elseif obj.type == 'onReset' then
reset(_M) reset(_M)
elseif obj.type == 'onGenome' then elseif obj.type == 'onGenome' then
for i=1,#speciesSlice,1 do for i=1,#species,1 do
local s = speciesSlice[i] local s = species[i]
if s.id == obj.speciesId then if s.id == obj.speciesId then
message(_M, string.format('Write Species %d Genome %d', obj.speciesId, obj.genomeIndex)) message(_M, string.format('Write Species %d Genome %d', obj.speciesId, obj.genomeIndex))
s.genomes[obj.genomeIndex] = obj.genome s.genomes[obj.genomeIndex] = obj.genome
@ -227,12 +219,33 @@ return function(promise)
end end
local waiters = {} local waiters = {}
for i=1,#speciesSlice,1 do for t=1,config.NeatConfig.Threads,1 do
local outputPipe = _M.poppets[i].output waiters[t] = Promise.new()
local waiter = readLoop(outputPipe) waiters[t]:resolve()
table.insert(waiters, waiter)
end end
local currentSpecies = 1
while currentSpecies < #species do
for t=1,config.NeatConfig.Threads,1 do
local s = species[currentSpecies]
if s == nil then
break
end
waiters[t] = waiters[t]:next(function()
local inputPipe = _M.poppets[t].input
inputPipe:write(serpent.dump({s, generationIdx}).."\n")
inputPipe:flush()
local outputPipe = _M.poppets[t].output
return readLoop(outputPipe)
end)
currentSpecies = currentSpecies + 1
end
end
message(_M, 'Waiting for child processes to finish')
return Promise.all(table.unpack(waiters)) return Promise.all(table.unpack(waiters))
end):next(function(maxFitnesses) end):next(function(maxFitnesses)
message(_M, 'Child processes finished') message(_M, 'Child processes finished')