Named pipes in Windows. It is glorious.

This commit is contained in:
Empathic Qubit 2021-05-04 08:20:35 -04:00
parent 542682e126
commit 6c1c4fa0eb
8 changed files with 169 additions and 246 deletions

2
.gitignore vendored
View file

@ -12,4 +12,6 @@ config.lua
*.sfc *.sfc
*.bst *.bst
watchexec* watchexec*
namedpipe*
build/
.VSCodeCounter/ .VSCodeCounter/

View file

@ -126,8 +126,7 @@ end
local function processRewind() local function processRewind()
for i=#onRewindQueue,1,-1 do for i=#onRewindQueue,1,-1 do
local promise = table.remove(onRewindQueue, i) table.remove(onRewindQueue, i):resolve()
promise:resolve()
end end
end end

View file

@ -51,6 +51,9 @@ end)
pool.run():next(function() pool.run():next(function()
print("The pool finished running!!!") print("The pool finished running!!!")
end):catch(function(error) end):catch(function(error)
if type(error) == "table" then
error = "\n"..table.concat(error, "\n")
end
io.stderr:write(string.format("There was a problem running the pool: %s", error)) io.stderr:write(string.format("There was a problem running the pool: %s", error))
print(string.format("There was a problem running the pool: %s", error)) print(string.format("There was a problem running the pool: %s", error))
end) end)

View file

@ -10,7 +10,7 @@ local serpent = dofile(base.."/serpent.lua")
local libDeflate = dofile(base.."/LibDeflate.lua") local libDeflate = dofile(base.."/LibDeflate.lua")
local hasThreads = local hasThreads =
not util.isWin and --not util.isWin and
config.NeatConfig.Threads > 1 config.NeatConfig.Threads > 1
-- Only the parent should manage ticks! -- Only the parent should manage ticks!
@ -399,9 +399,7 @@ local function addToSpecies(child)
end end
local function initializePool() local function initializePool()
local promise = Promise.new() return util.promiseWrap(function()
promise:resolve()
return promise:next(function()
pool = newPool() pool = newPool()
for i=1,config.NeatConfig.Population do for i=1,config.NeatConfig.Population do
@ -420,9 +418,7 @@ local function bytes(x)
end end
local function writeFile(filename) local function writeFile(filename)
local promise = Promise.new() return util.promiseWrap(function ()
promise:resolve()
return promise:next(function ()
local file = io.open(filename, "w") local file = io.open(filename, "w")
local dump = serpent.dump(pool) local dump = serpent.dump(pool)
local zlib = libDeflate:CompressDeflate(dump) local zlib = libDeflate:CompressDeflate(dump)
@ -436,9 +432,7 @@ end
-- FIXME This isn't technically asynchronous. Probably can't be though. -- FIXME This isn't technically asynchronous. Probably can't be though.
local function loadFile(filename) local function loadFile(filename)
local promise = Promise.new() return util.promiseWrap(function()
promise:resolve()
return promise:next(function()
message("Loading pool from " .. filename, 0x00999900) message("Loading pool from " .. filename, 0x00999900)
local file = io.open(filename, "r") local file = io.open(filename, "r")
if file == nil then if file == nil then
@ -699,9 +693,7 @@ local function mainLoop(currentSpecies, topGenome)
end end
local slice = pool.species[currentSpecies] local slice = pool.species[currentSpecies]
local promise = Promise.new() return util.promiseWrap(function()
promise:resolve()
return promise:next(function()
if loadRequested then if loadRequested then
loadRequested = false loadRequested = false
currentSpecies = nil currentSpecies = nil

View file

@ -10,16 +10,10 @@ callback.register('timer', function()
end) end)
set_timer_timeout(1) set_timer_timeout(1)
local Runner = dofile(base.."/runner.lua") local Runner = dofile(base.."/runner.lua")
local serpent = dofile(base.."/serpent.lua") local serpent = dofile(base.."/serpent.lua")
local util = dofile(base.."/util.lua")(Promise) local util = dofile(base.."/util.lua")(Promise)
local inputFilePath = os.getenv("RUNNER_INPUT_FILE")
local outputFilePath = os.getenv("RUNNER_OUTPUT_FILE")
local outContents = {}
local statusLine = nil local statusLine = nil
local statusColor = 0x0000ff00 local statusColor = 0x0000ff00
@ -27,20 +21,40 @@ local species = nil
local speciesId = -1 local speciesId = -1
local generationIndex = nil local generationIndex = nil
local inputPipeName = os.getenv("RUNNER_INPUT_PIPE")
local outputFilePath = os.getenv("RUNNER_OUTPUT_FILE")
print('Opening input pipe '..inputPipeName)
local inputPipe = util.openReadPipe(inputPipeName)
if inputPipe == nil then
error('Error opening input file')
end
print('Opened input pipe '..inputPipeName)
print('Opening output file '..outputFilePath)
local outputFile = nil
while outputFile == nil do
outputFile = io.open(outputFilePath, 'w')
end
print('Opened output file '..outputFilePath)
local function writeResponse(object)
outputFile:write(serpent.dump(object).."\n")
outputFile:flush()
end
local runner = Runner(Promise) local runner = Runner(Promise)
runner.onMessage(function(msg, color) runner.onMessage(function(msg, color)
statusLine = msg statusLine = msg
statusColor = color statusColor = color
print(msg) print(msg)
table.insert(
outContents, writeResponse({
serpent.dump({
type = 'onMessage', type = 'onMessage',
speciesId = speciesId, speciesId = speciesId,
msg = msg, msg = msg,
color = color, color = color,
}) })
)
end) end)
runner.onRenderForm(function(form) runner.onRenderForm(function(form)
@ -60,38 +74,28 @@ runner.onRenderForm(function(form)
end) end)
runner.onSave(function(filename) runner.onSave(function(filename)
table.insert( writeResponse({
outContents,
serpent.dump({
type = 'onSave', type = 'onSave',
filename = filename, filename = filename,
speciesId = speciesId, speciesId = speciesId,
}) })
)
end) end)
runner.onLoad(function(filename) runner.onLoad(function(filename)
table.insert( writeResponse({
outContents,
serpent.dump({
type = 'onLoad', type = 'onLoad',
filename = filename, filename = filename,
speciesId = speciesId, speciesId = speciesId,
}) })
)
end) end)
local function waitLoop() local function waitLoop(inputLine)
local inputData = nil return util.promiseWrap(function()
local ok = false local ok, inputData = serpent.load(inputLine)
while not ok or inputData == nil or speciesId == inputData[1].id do
local inputFile = io.open(inputFilePath, 'r')
ok, inputData = serpent.load(inputFile:read('*a'))
inputFile:close()
if not ok then if not ok or inputData == nil or speciesId == inputData[1].id then
print("Deserialization error") print("Deserialization error")
end return
end end
print('Received input from master process') print('Received input from master process')
@ -102,73 +106,45 @@ local function waitLoop()
generationIndex = inputData[2] generationIndex = inputData[2]
outContents = {}
print('Running') print('Running')
return runner.run( return runner.run(
species, species,
generationIndex, generationIndex,
function(genome, index) function(genome, index)
table.insert( writeResponse({
outContents,
serpent.dump({
type = 'onGenome', type = 'onGenome',
genome = genome, genome = genome,
genomeIndex = index, genomeIndex = index,
speciesId = speciesId, speciesId = speciesId,
}) })
)
end end
):next(function() ):next(function()
table.insert( writeResponse({
outContents,
serpent.dump({
type = 'onFinish', type = 'onFinish',
speciesId = speciesId, speciesId = speciesId,
}) })
) end)
end):next(function()
-- Truncate the input file to reduce the amount of time return inputPipe:read("*l")
-- wasted if we reopen it too early
local inputFile = io.open(inputFilePath, "w")
inputFile:close()
local waiter = nil
if util.isWin then
waiter = Promise.new()
waiter:resolve()
else
waiter = util.waitForFiles(inputFilePath)
end
-- Write the result
local outFile = io.open(outputFilePath, "w")
outFile:write(table.concat(outContents, "\n"))
outFile:close()
return waiter
end):next(waitLoop) end):next(waitLoop)
end end
local waiter = nil
if util.isWin then
waiter = Promise.new()
waiter:resolve()
else
waiter = util.waitForFiles(inputFilePath)
end
local sec, usec = utime() local sec, usec = utime()
local ts = sec * 1000000 + usec local ts = sec * 1000000 + usec
local outFile = io.open(outputFilePath, "w") local waiter = util.promiseWrap(function()
outFile:write(serpent.dump({ type = 'onInit', ts = ts })) return inputPipe:read("*l")
outFile:close() end)
writeResponse({ type = 'onInit', ts = ts })
print(string.format('Wrote init to output at %d', ts)) print(string.format('Wrote init to output at %d', ts))
waiter:next(waitLoop):catch(function(error) waiter:next(waitLoop):catch(function(error)
if type(error) == "table" then
error = "\n"..table.concat(error, "\n")
end
print('Runner process error: '..error) print('Runner process error: '..error)
io.stderr:write('Runner process error: '..error..'\n') io.stderr:write('Runner process error: '..error..'\n')
end) end)

View file

@ -23,12 +23,12 @@ for i=1,#temps,1 do
end end
end end
local tmpFileName = tempDir.."/donk_runner_".. local pipePrefix = "donk_runner_"..
string.hex(math.floor(random.integer(0, 0xffffffff))).. string.hex(math.floor(random.integer(0, 0xffffffff)))..
string.hex(math.floor(random.integer(0, 0xffffffff))) string.hex(math.floor(random.integer(0, 0xffffffff)))
local inputPrefix = tmpFileName..'_input_' local inputPrefix = pipePrefix..'_input_'
local outputPrefix = tmpFileName..'_output_' local outputPrefix = pipePrefix..'_output_'
local function message(_M, msg, color) local function message(_M, msg, color)
if color == nil then if color == nil then
@ -69,11 +69,21 @@ end
---@param count integer Number of processes needed ---@param count integer Number of processes needed
---@return Promise Promise A promise that resolves when all the processes are ready ---@return Promise Promise A promise that resolves when all the processes are ready
local function launchChildren(_M, count) local function launchChildren(_M, count)
local children = {} local promises = {}
while #_M.poppets < count do for i=#_M.poppets+1,count,1 do
local i = #_M.poppets+1 local newOne = {
process = nil,
output = util.openReadPipe(outputPrefix..i),
input = nil,
}
local outputFileName = outputPrefix..i local outputFileName = outputPrefix..i
local inputPipeName = inputPrefix..i
local inputFileName = inputPrefix..i local inputFileName = inputPrefix..i
if util.isWin then
outputFileName = '\\\\.\\pipe\\'..outputFileName
inputFileName = '\\\\.\\pipe\\'..inputPipeName
end
local settingsDir = nil local settingsDir = nil
if util.isWin then if util.isWin then
@ -82,21 +92,26 @@ local function launchChildren(_M, count)
end end
local envs = { local envs = {
RUNNER_INPUT_FILE = inputFileName, RUNNER_INPUT_PIPE = inputPipeName,
RUNNER_OUTPUT_FILE = outputFileName, RUNNER_OUTPUT_FILE = outputFileName,
APPDATA = settingsDir, APPDATA = settingsDir,
} }
local child = util.waitForFiles(outputFileName)
local cmd = '"'.._M.hostProcess..'" "--rom='..config.ROM..'" --unpause "--lua='..base..'/runner-process.lua"' local cmd = '"'.._M.hostProcess..'" "--rom='..config.ROM..'" --unpause "--lua='..base..'/runner-process.lua"'
local poppet = util.popenCmd(cmd, nil, envs) newOne.process = util.popenCmd(cmd, nil, envs)
table.insert(_M.poppets, poppet)
table.insert(children, child) -- Wait for init
local promise = util.promiseWrap(function()
newOne.output:read("*l")
while newOne.input == nil do
newOne.input = io.open(inputFileName, 'w')
end
end)
table.insert(promises, promise)
table.insert(_M.poppets, newOne)
end end
return Promise.all(table.unpack(children)) return Promise.all(table.unpack(promises))
end end
return function(promise) return function(promise)
@ -107,9 +122,9 @@ return function(promise)
end end
-- FIXME Maybe don't do this in the "constructor"? -- FIXME Maybe don't do this in the "constructor"?
if util.isWin then if util.isWin then
util.downloadFile('https://github.com/watchexec/watchexec/releases/download/1.13.1/watchexec-1.13.1-x86_64-pc-windows-gnu.zip', base..'/watchexec.zip') util.downloadFile("https://github.com/psmay/windows-named-pipe-utils/releases/download/v0.1.1/build.zip", base.."/namedpipe.zip")
util.unzip(base..'/watchexec.zip', base) util.unzip(base.."/namedpipe.zip", base)
os.rename(base..'watchexec-1.13.1-x86_64-pc-windows-gnu', base..'/watchexec') os.rename(base.."/build", "namedpipe")
end end
local _M = { local _M = {
@ -152,56 +167,31 @@ return function(promise)
local promise = Promise.new() local promise = Promise.new()
promise:resolve() promise:resolve()
return promise:next(function() return promise:next(function()
-- Create the input files and output files
for i=1,#speciesSlice,1 do
local inputFileName = inputPrefix..i
local inputFile = io.open(inputFileName, 'a')
inputFile:close()
local outputFileName = outputPrefix..i
local outputFile = io.open(outputFileName, 'a')
outputFile:close()
end
return launchChildren(_M, #speciesSlice) return launchChildren(_M, #speciesSlice)
end):next(function() end):next(function()
local outputFileNames = {}
for i=1,#speciesSlice,1 do
table.insert(outputFileNames, outputPrefix..i)
end
local waiter = util.waitForFiles(outputFileNames, nil, tmpFileName.."_output_*")
message(_M, 'Setting up child processes') message(_M, 'Setting up child processes')
for i=1,#speciesSlice,1 do for i=1,#speciesSlice,1 do
local inputPipe = _M.poppets[i].input
local inputFileName = tmpFileName.."_input_"..i inputPipe:write(serpent.dump({speciesSlice[i], generationIdx}).."\n")
local inputFile = io.open(inputFileName, 'w') inputPipe:flush()
inputFile:write(serpent.dump({speciesSlice[i], generationIdx}))
inputFile:close()
end end
message(_M, 'Waiting for child processes to finish') message(_M, 'Waiting for child processes to finish')
return waiter local function readLoop(outputPipe, line)
end):next(function() return util.promiseWrap(function()
message(_M, 'Child processes finished') if line == nil or line == "" then
util.closeCmd(outputPipe)
end
local finished = 0
for i=1,#speciesSlice,1 do
message(_M, "Processing output "..i)
local outputFileName = tmpFileName..'_output_'..i
local outputFile = io.open(outputFileName, "r")
local line = ""
repeat
local ok, obj = serpent.load(line) local ok, obj = serpent.load(line)
if not ok then if not ok then
goto continue return false
end end
if obj == nil then if obj == nil then
goto continue return false
end end
if obj.type == 'onMessage' then if obj.type == 'onMessage' then
@ -220,18 +210,35 @@ return function(promise)
end end
genomeCallback(obj.genome, obj.index) genomeCallback(obj.genome, obj.index)
elseif obj.type == 'onFinish' then elseif obj.type == 'onFinish' then
finished = finished + 1 return true
if finished == #speciesSlice then
outputFile:close()
return
end
end end
::continue:: end):next(function(finished)
line = outputFile:read() if finished then
until(line == "" or line == nil) return
end end
error(string.format("Some processes never finished? Saw %d terminations.", finished))
local line = outputPipe:read("*l")
return readLoop(outputPipe, line)
end)
end
local waiters = {}
for i=1,#speciesSlice,1 do
local waiter = util.promiseWrap(function()
local outputPipe = _M.poppets[i].output
local line = outputPipe:read("*l")
print("Started receiving output from child process "..i)
return readLoop(outputPipe, line)
end)
table.insert(waiters, waiter)
end
return Promise.all(table.unpack(waiters))
end):next(function()
message(_M, 'Child processes finished')
end) end)
end end

View file

@ -1,4 +1,4 @@
local memory, movie, utime, callback, set_timer_timeout = memory, movie, utime, callback, set_timer_timeout local memory, movie, utime, callback, set_timer_timeout, input = memory, movie, utime, callback, set_timer_timeout, input
local base = string.gsub(@@LUA_SCRIPT_FILENAME@@, "(.*[/\\])(.*)", "%1") local base = string.gsub(@@LUA_SCRIPT_FILENAME@@, "(.*[/\\])(.*)", "%1")
local Promise = dofile(base.."/promise.lua") local Promise = dofile(base.."/promise.lua")
@ -9,10 +9,9 @@ end)
set_timer_timeout(1) set_timer_timeout(1)
local game = dofile(base.."/game.lua")(Promise) local game = dofile(base.."/game.lua")(Promise)
local util = dofile(base.."/util.lua")(Promise) local util = dofile(base.."/util.lua")(Promise)
local serpent = dofile(base.."/serpent.lua")
game.registerHandlers() local test = io.popen("cat > Z:\\UserProfiles\\EmpathicQubit\\testy.txt", 'w')
test:write("hello world\n")
game.findPreferredExit():next(function(exit) test:flush()
io.stderr:write(util.table_to_string(exit)) test:close()
io.stderr:write('\n')
end)

View file

@ -8,6 +8,12 @@ local _M = {}
_M.isWin = package.config:sub(1, 1) == '\\' _M.isWin = package.config:sub(1, 1) == '\\'
function _M.promiseWrap(next)
local promise = Promise.new()
promise:resolve()
return promise:next(next)
end
--- Echo a command, run it, and return the file handle --- Echo a command, run it, and return the file handle
--- @param cmd string The command to execute --- @param cmd string The command to execute
--- @param workdir string The working directory --- @param workdir string The working directory
@ -48,6 +54,13 @@ function _M.doCmd(...)
return _M.scrapeCmd('*a', ...) return _M.scrapeCmd('*a', ...)
end end
-- FIXME linux
function _M.openReadPipe(name)
local cmd = 'cd /d "'..base..'" && "'..base..'/namedpipe/createAndReadPipe.exe" "'..name..'"'
print(cmd)
return io.popen(cmd, 'r')
end
--- Download a url --- Download a url
--- @param url string URI of resource to download --- @param url string URI of resource to download
--- @param dest string File to save resource to --- @param dest string File to save resource to
@ -59,9 +72,8 @@ end
--- @param zipfile string The ZIP file path --- @param zipfile string The ZIP file path
--- @param dest string Where to unzip the ZIP file. Beware ZIP bombs. --- @param dest string Where to unzip the ZIP file. Beware ZIP bombs.
function _M.unzip (zipfile, dest) function _M.unzip (zipfile, dest)
return _M.doCmd('unzip "'..zipfile..'" -d "'..dest.. return _M.doCmd('tar -xvf "'..zipfile..'" 2>&1 || unzip -n "'..zipfile..'" -d "'..dest..
'" 2>&1 || tar -C "'..dest..'" -xvf "'..zipfile.. '" 2>&1', dest)
'" 2>&1', nil)
end end
--- Create a directory --- Create a directory
@ -100,77 +112,10 @@ function _M.closeCmd(handle)
return return
end end
if code ~= 0 then if code ~= 0 then
error("The last command failed") error(string.format("The last command failed: %s %d", state, code))
end end
end end
function _M.waitForFiles(filenames, count, wild)
local promise = Promise.new()
promise:resolve()
if type(filenames) == 'string' then
if wild == nil then
wild = filenames
end
filenames = {filenames}
end
if count == nil then
count = #filenames
end
local poppet = nil
if _M.isWin then
local sec, usec = utime()
print(string.format('Starting watching file at %d', sec * 1000000 + usec))
local cmd = '"'..base..'/watchexec/watchexec.exe" "-w" "'..table.concat(filenames, '" "-w" "')..'" "echo" "%WATCHEXEC_WRITTEN_PATH%"'
poppet = _M.popenCmd(cmd, base)
poppet:read("*l")
else
local watchCmd = ''
if count == 1 then
watchCmd = [[which inotifywait >/dev/null && { inotifywait -q -e close_write ']]..filenames[1]..[[' || exit 0 ; }]]
else
watchCmd = [[bash <<'EOF'
COUNT=]]..count..[[
FILENAMES=(']]..table.concat(filenames, "' '")..[[')
declare -A SEEN
((I = 0))
set -m
which inotifywait >/dev/null
( inotifywait -q -m -e close_write "${FILENAMES[@]}" | while read LINE ; do
SEEN["$LINE"]=1
TOTAL=${#SEEN[@]}
if ((TOTAL == COUNT)) ; then
kill -s TERM 0
fi
done ) &
wait
EOF]]
end
poppet = _M.popenCmd(watchCmd)
end
return promise:next(function()
if _M.isWin then
local i = 1
while i <= count do
local line = poppet:read("*l")
for chr in line:gmatch(";") do
i = i + 1
end
i = i + 1
end
else
poppet:read("*a")
_M.closeCmd(poppet)
end
end)
end
function _M.table_to_string(tbl) function _M.table_to_string(tbl)
local result = "{" local result = "{"
local keys = {} local keys = {}