diff --git a/.gitignore b/.gitignore index 03eefcb..8dc493d 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,6 @@ config.lua *.sfc *.bst watchexec* +namedpipe* +build/ .VSCodeCounter/ diff --git a/game.lua b/game.lua index c78a9fb..152192f 100644 --- a/game.lua +++ b/game.lua @@ -126,8 +126,7 @@ end local function processRewind() for i=#onRewindQueue,1,-1 do - local promise = table.remove(onRewindQueue, i) - promise:resolve() + table.remove(onRewindQueue, i):resolve() end end diff --git a/neat-donk.lua b/neat-donk.lua index 4b1b269..8a1187d 100644 --- a/neat-donk.lua +++ b/neat-donk.lua @@ -51,6 +51,9 @@ end) pool.run():next(function() print("The pool finished running!!!") end):catch(function(error) + if type(error) == "table" then + error = "\n"..table.concat(error, "\n") + end io.stderr:write(string.format("There was a problem running the pool: %s", error)) print(string.format("There was a problem running the pool: %s", error)) end) diff --git a/pool.lua b/pool.lua index 9fdfee7..7ad4bb6 100644 --- a/pool.lua +++ b/pool.lua @@ -10,7 +10,7 @@ local serpent = dofile(base.."/serpent.lua") local libDeflate = dofile(base.."/LibDeflate.lua") local hasThreads = - not util.isWin and + --not util.isWin and config.NeatConfig.Threads > 1 -- Only the parent should manage ticks! @@ -399,9 +399,7 @@ local function addToSpecies(child) end local function initializePool() - local promise = Promise.new() - promise:resolve() - return promise:next(function() + return util.promiseWrap(function() pool = newPool() for i=1,config.NeatConfig.Population do @@ -420,9 +418,7 @@ local function bytes(x) end local function writeFile(filename) - local promise = Promise.new() - promise:resolve() - return promise:next(function () + return util.promiseWrap(function () local file = io.open(filename, "w") local dump = serpent.dump(pool) local zlib = libDeflate:CompressDeflate(dump) @@ -436,9 +432,7 @@ end -- FIXME This isn't technically asynchronous. Probably can't be though. local function loadFile(filename) - local promise = Promise.new() - promise:resolve() - return promise:next(function() + return util.promiseWrap(function() message("Loading pool from " .. filename, 0x00999900) local file = io.open(filename, "r") if file == nil then @@ -699,9 +693,7 @@ local function mainLoop(currentSpecies, topGenome) end local slice = pool.species[currentSpecies] - local promise = Promise.new() - promise:resolve() - return promise:next(function() + return util.promiseWrap(function() if loadRequested then loadRequested = false currentSpecies = nil diff --git a/runner-process.lua b/runner-process.lua index 87c982c..c85fe70 100644 --- a/runner-process.lua +++ b/runner-process.lua @@ -10,16 +10,10 @@ callback.register('timer', function() end) set_timer_timeout(1) - local Runner = dofile(base.."/runner.lua") local serpent = dofile(base.."/serpent.lua") local util = dofile(base.."/util.lua")(Promise) -local inputFilePath = os.getenv("RUNNER_INPUT_FILE") -local outputFilePath = os.getenv("RUNNER_OUTPUT_FILE") - -local outContents = {} - local statusLine = nil local statusColor = 0x0000ff00 @@ -27,20 +21,40 @@ local species = nil local speciesId = -1 local generationIndex = nil +local inputPipeName = os.getenv("RUNNER_INPUT_PIPE") +local outputFilePath = os.getenv("RUNNER_OUTPUT_FILE") + +print('Opening input pipe '..inputPipeName) +local inputPipe = util.openReadPipe(inputPipeName) +if inputPipe == nil then + error('Error opening input file') +end +print('Opened input pipe '..inputPipeName) + +print('Opening output file '..outputFilePath) +local outputFile = nil +while outputFile == nil do + outputFile = io.open(outputFilePath, 'w') +end +print('Opened output file '..outputFilePath) + +local function writeResponse(object) + outputFile:write(serpent.dump(object).."\n") + outputFile:flush() +end + local runner = Runner(Promise) runner.onMessage(function(msg, color) statusLine = msg statusColor = color print(msg) - table.insert( - outContents, - serpent.dump({ - type = 'onMessage', - speciesId = speciesId, - msg = msg, - color = color, - }) - ) + + writeResponse({ + type = 'onMessage', + speciesId = speciesId, + msg = msg, + color = color, + }) end) runner.onRenderForm(function(form) @@ -60,115 +74,77 @@ runner.onRenderForm(function(form) end) runner.onSave(function(filename) - table.insert( - outContents, - serpent.dump({ - type = 'onSave', - filename = filename, - speciesId = speciesId, - }) - ) + writeResponse({ + type = 'onSave', + filename = filename, + speciesId = speciesId, + }) end) runner.onLoad(function(filename) - table.insert( - outContents, - serpent.dump({ - type = 'onLoad', - filename = filename, - speciesId = speciesId, - }) - ) + writeResponse({ + type = 'onLoad', + filename = filename, + speciesId = speciesId, + }) end) -local function waitLoop() - local inputData = nil - local ok = false - while not ok or inputData == nil or speciesId == inputData[1].id do - local inputFile = io.open(inputFilePath, 'r') - ok, inputData = serpent.load(inputFile:read('*a')) - inputFile:close() +local function waitLoop(inputLine) + return util.promiseWrap(function() + local ok, inputData = serpent.load(inputLine) - if not ok then + if not ok or inputData == nil or speciesId == inputData[1].id then print("Deserialization error") + return end - end - print('Received input from master process') + print('Received input from master process') - species = inputData[1] + species = inputData[1] - speciesId = species.id + speciesId = species.id - generationIndex = inputData[2] + generationIndex = inputData[2] - outContents = {} + print('Running') - print('Running') - - return runner.run( - species, - generationIndex, - function(genome, index) - table.insert( - outContents, - serpent.dump({ + return runner.run( + species, + generationIndex, + function(genome, index) + writeResponse({ type = 'onGenome', genome = genome, genomeIndex = index, speciesId = speciesId, }) - ) - end - ):next(function() - table.insert( - outContents, - serpent.dump({ + end + ):next(function() + writeResponse({ type = 'onFinish', speciesId = speciesId, }) - ) - - -- Truncate the input file to reduce the amount of time - -- wasted if we reopen it too early - local inputFile = io.open(inputFilePath, "w") - inputFile:close() - - local waiter = nil - if util.isWin then - waiter = Promise.new() - waiter:resolve() - else - waiter = util.waitForFiles(inputFilePath) - end - - -- Write the result - local outFile = io.open(outputFilePath, "w") - outFile:write(table.concat(outContents, "\n")) - outFile:close() - - return waiter + end) + end):next(function() + return inputPipe:read("*l") end):next(waitLoop) end -local waiter = nil -if util.isWin then - waiter = Promise.new() - waiter:resolve() -else - waiter = util.waitForFiles(inputFilePath) -end - local sec, usec = utime() local ts = sec * 1000000 + usec -local outFile = io.open(outputFilePath, "w") -outFile:write(serpent.dump({ type = 'onInit', ts = ts })) -outFile:close() +local waiter = util.promiseWrap(function() + return inputPipe:read("*l") +end) + +writeResponse({ type = 'onInit', ts = ts }) print(string.format('Wrote init to output at %d', ts)) waiter:next(waitLoop):catch(function(error) + if type(error) == "table" then + error = "\n"..table.concat(error, "\n") + end print('Runner process error: '..error) io.stderr:write('Runner process error: '..error..'\n') -end) +end) \ No newline at end of file diff --git a/runner-wrapper.lua b/runner-wrapper.lua index ccfe051..e18e2ae 100644 --- a/runner-wrapper.lua +++ b/runner-wrapper.lua @@ -23,12 +23,12 @@ for i=1,#temps,1 do end end -local tmpFileName = tempDir.."/donk_runner_".. +local pipePrefix = "donk_runner_".. string.hex(math.floor(random.integer(0, 0xffffffff))).. string.hex(math.floor(random.integer(0, 0xffffffff))) -local inputPrefix = tmpFileName..'_input_' -local outputPrefix = tmpFileName..'_output_' +local inputPrefix = pipePrefix..'_input_' +local outputPrefix = pipePrefix..'_output_' local function message(_M, msg, color) if color == nil then @@ -69,11 +69,21 @@ end ---@param count integer Number of processes needed ---@return Promise Promise A promise that resolves when all the processes are ready local function launchChildren(_M, count) - local children = {} - while #_M.poppets < count do - local i = #_M.poppets+1 + local promises = {} + for i=#_M.poppets+1,count,1 do + local newOne = { + process = nil, + output = util.openReadPipe(outputPrefix..i), + input = nil, + } + local outputFileName = outputPrefix..i + local inputPipeName = inputPrefix..i local inputFileName = inputPrefix..i + if util.isWin then + outputFileName = '\\\\.\\pipe\\'..outputFileName + inputFileName = '\\\\.\\pipe\\'..inputPipeName + end local settingsDir = nil if util.isWin then @@ -82,21 +92,26 @@ local function launchChildren(_M, count) end local envs = { - RUNNER_INPUT_FILE = inputFileName, + RUNNER_INPUT_PIPE = inputPipeName, RUNNER_OUTPUT_FILE = outputFileName, APPDATA = settingsDir, } - local child = util.waitForFiles(outputFileName) - local cmd = '"'.._M.hostProcess..'" "--rom='..config.ROM..'" --unpause "--lua='..base..'/runner-process.lua"' - local poppet = util.popenCmd(cmd, nil, envs) - table.insert(_M.poppets, poppet) + newOne.process = util.popenCmd(cmd, nil, envs) - table.insert(children, child) + -- Wait for init + local promise = util.promiseWrap(function() + newOne.output:read("*l") + while newOne.input == nil do + newOne.input = io.open(inputFileName, 'w') + end + end) + table.insert(promises, promise) + table.insert(_M.poppets, newOne) end - return Promise.all(table.unpack(children)) + return Promise.all(table.unpack(promises)) end return function(promise) @@ -107,9 +122,9 @@ return function(promise) end -- FIXME Maybe don't do this in the "constructor"? if util.isWin then - util.downloadFile('https://github.com/watchexec/watchexec/releases/download/1.13.1/watchexec-1.13.1-x86_64-pc-windows-gnu.zip', base..'/watchexec.zip') - util.unzip(base..'/watchexec.zip', base) - os.rename(base..'watchexec-1.13.1-x86_64-pc-windows-gnu', base..'/watchexec') + util.downloadFile("https://github.com/psmay/windows-named-pipe-utils/releases/download/v0.1.1/build.zip", base.."/namedpipe.zip") + util.unzip(base.."/namedpipe.zip", base) + os.rename(base.."/build", "namedpipe") end local _M = { @@ -152,56 +167,31 @@ return function(promise) local promise = Promise.new() promise:resolve() return promise:next(function() - -- Create the input files and output files - for i=1,#speciesSlice,1 do - local inputFileName = inputPrefix..i - local inputFile = io.open(inputFileName, 'a') - inputFile:close() - - local outputFileName = outputPrefix..i - local outputFile = io.open(outputFileName, 'a') - outputFile:close() - end - return launchChildren(_M, #speciesSlice) end):next(function() - local outputFileNames = {} - for i=1,#speciesSlice,1 do - table.insert(outputFileNames, outputPrefix..i) - end - - local waiter = util.waitForFiles(outputFileNames, nil, tmpFileName.."_output_*") - message(_M, 'Setting up child processes') for i=1,#speciesSlice,1 do - - local inputFileName = tmpFileName.."_input_"..i - local inputFile = io.open(inputFileName, 'w') - inputFile:write(serpent.dump({speciesSlice[i], generationIdx})) - inputFile:close() + local inputPipe = _M.poppets[i].input + inputPipe:write(serpent.dump({speciesSlice[i], generationIdx}).."\n") + inputPipe:flush() end message(_M, 'Waiting for child processes to finish') - return waiter - end):next(function() - message(_M, 'Child processes finished') + local function readLoop(outputPipe, line) + return util.promiseWrap(function() + if line == nil or line == "" then + util.closeCmd(outputPipe) + end - local finished = 0 - for i=1,#speciesSlice,1 do - message(_M, "Processing output "..i) - local outputFileName = tmpFileName..'_output_'..i - local outputFile = io.open(outputFileName, "r") - local line = "" - repeat local ok, obj = serpent.load(line) if not ok then - goto continue + return false end if obj == nil then - goto continue + return false end if obj.type == 'onMessage' then @@ -220,18 +210,35 @@ return function(promise) end genomeCallback(obj.genome, obj.index) elseif obj.type == 'onFinish' then - finished = finished + 1 - if finished == #speciesSlice then - outputFile:close() - return - end + return true end - ::continue:: - line = outputFile:read() - until(line == "" or line == nil) + end):next(function(finished) + if finished then + return + end + + local line = outputPipe:read("*l") + return readLoop(outputPipe, line) + end) end - error(string.format("Some processes never finished? Saw %d terminations.", finished)) + + local waiters = {} + for i=1,#speciesSlice,1 do + local waiter = util.promiseWrap(function() + local outputPipe = _M.poppets[i].output + local line = outputPipe:read("*l") + + print("Started receiving output from child process "..i) + + return readLoop(outputPipe, line) + end) + table.insert(waiters, waiter) + end + + return Promise.all(table.unpack(waiters)) + end):next(function() + message(_M, 'Child processes finished') end) end diff --git a/state-test.lua b/state-test.lua index 8286489..c8f8ebb 100644 --- a/state-test.lua +++ b/state-test.lua @@ -1,4 +1,4 @@ -local memory, movie, utime, callback, set_timer_timeout = memory, movie, utime, callback, set_timer_timeout +local memory, movie, utime, callback, set_timer_timeout, input = memory, movie, utime, callback, set_timer_timeout, input local base = string.gsub(@@LUA_SCRIPT_FILENAME@@, "(.*[/\\])(.*)", "%1") local Promise = dofile(base.."/promise.lua") @@ -9,10 +9,9 @@ end) set_timer_timeout(1) local game = dofile(base.."/game.lua")(Promise) local util = dofile(base.."/util.lua")(Promise) +local serpent = dofile(base.."/serpent.lua") -game.registerHandlers() - -game.findPreferredExit():next(function(exit) - io.stderr:write(util.table_to_string(exit)) - io.stderr:write('\n') -end) \ No newline at end of file +local test = io.popen("cat > Z:\\UserProfiles\\EmpathicQubit\\testy.txt", 'w') +test:write("hello world\n") +test:flush() +test:close() \ No newline at end of file diff --git a/util.lua b/util.lua index f4f0a2b..9790638 100644 --- a/util.lua +++ b/util.lua @@ -8,6 +8,12 @@ local _M = {} _M.isWin = package.config:sub(1, 1) == '\\' +function _M.promiseWrap(next) + local promise = Promise.new() + promise:resolve() + return promise:next(next) +end + --- Echo a command, run it, and return the file handle --- @param cmd string The command to execute --- @param workdir string The working directory @@ -48,6 +54,13 @@ function _M.doCmd(...) return _M.scrapeCmd('*a', ...) end +-- FIXME linux +function _M.openReadPipe(name) + local cmd = 'cd /d "'..base..'" && "'..base..'/namedpipe/createAndReadPipe.exe" "'..name..'"' + print(cmd) + return io.popen(cmd, 'r') +end + --- Download a url --- @param url string URI of resource to download --- @param dest string File to save resource to @@ -59,9 +72,8 @@ end --- @param zipfile string The ZIP file path --- @param dest string Where to unzip the ZIP file. Beware ZIP bombs. function _M.unzip (zipfile, dest) - return _M.doCmd('unzip "'..zipfile..'" -d "'..dest.. - '" 2>&1 || tar -C "'..dest..'" -xvf "'..zipfile.. - '" 2>&1', nil) + return _M.doCmd('tar -xvf "'..zipfile..'" 2>&1 || unzip -n "'..zipfile..'" -d "'..dest.. + '" 2>&1', dest) end --- Create a directory @@ -100,77 +112,10 @@ function _M.closeCmd(handle) return end if code ~= 0 then - error("The last command failed") + error(string.format("The last command failed: %s %d", state, code)) end end -function _M.waitForFiles(filenames, count, wild) - local promise = Promise.new() - promise:resolve() - - if type(filenames) == 'string' then - if wild == nil then - wild = filenames - end - - filenames = {filenames} - end - - if count == nil then - count = #filenames - end - - local poppet = nil - if _M.isWin then - local sec, usec = utime() - print(string.format('Starting watching file at %d', sec * 1000000 + usec)) - - local cmd = '"'..base..'/watchexec/watchexec.exe" "-w" "'..table.concat(filenames, '" "-w" "')..'" "echo" "%WATCHEXEC_WRITTEN_PATH%"' - poppet = _M.popenCmd(cmd, base) - - poppet:read("*l") - else - local watchCmd = '' - if count == 1 then - watchCmd = [[which inotifywait >/dev/null && { inotifywait -q -e close_write ']]..filenames[1]..[[' || exit 0 ; }]] - else - watchCmd = [[bash <<'EOF' -COUNT=]]..count..[[ -FILENAMES=(']]..table.concat(filenames, "' '")..[[') -declare -A SEEN -((I = 0)) -set -m -which inotifywait >/dev/null -( inotifywait -q -m -e close_write "${FILENAMES[@]}" | while read LINE ; do - SEEN["$LINE"]=1 - TOTAL=${#SEEN[@]} - if ((TOTAL == COUNT)) ; then - kill -s TERM 0 - fi -done ) & -wait -EOF]] - end - poppet = _M.popenCmd(watchCmd) - end - - return promise:next(function() - if _M.isWin then - local i = 1 - while i <= count do - local line = poppet:read("*l") - for chr in line:gmatch(";") do - i = i + 1 - end - i = i + 1 - end - else - poppet:read("*a") - _M.closeCmd(poppet) - end - end) -end - function _M.table_to_string(tbl) local result = "{" local keys = {}