#!/usr/bin/python import sys, os, os.path, signal, socket, re, time def redirectOutput(fileName): os.close(1) os.close(2) fd = os.open(fileName, os.O_WRONLY|os.O_CREAT|os.O_APPEND, 0666) os.dup2(fd, 1) os.dup2(fd,2) def dumpHTML(outputFile, *params): fullParams = ["nice", "-n15", "php","-n","dumpHTML.php"] fullParams.extend(params) pid = os.fork() if pid == 0: redirectOutput(outputFile) os.execvp("nice", fullParams) sys.exit(1) # Wait for the child to exit (or the parent) status = os.waitpid(pid, os.WNOHANG) while status == (0,0) and os.getppid() > 1: time.sleep(5) status = os.waitpid(pid, os.WNOHANG) # If the parent exited, then kill the child if status == (0,0): os.kill(pid, signal.SIGKILL) def finishWiki(outputFile, lang): global edition, siteDir, baseDir if (not os.path.isdir(siteDir+"/"+lang+"-new")): msg("Already compressed "+lang) return msg("Finishing language "+lang) os.system("%(baseDir)s/scripts/finish-lang %(lang)s %(edition)s 2>&1 >> %(outputFile)s" % { 'baseDir' : baseDir, 'lang' : lang, 'edition' : edition, 'outputFile': outputFile }) msg("Done") def writeStatus(jobID, status): global baseDir f = open(baseDir+"/jobs/"+jobID, "w") print >> f, socket.gethostname(), os.getpid() print >> f, status f.close() def isStatusMine(jobID): global baseDir try: f = open(baseDir+"/jobs/"+jobID, "r") except: msg("Status file is missing") return False fields = f.readline().split() f.close() if len(fields) != 2: msg("Warning: invalid status file") return False if fields[0] == socket.gethostname() and fields[1] == str(os.getpid()): return True else: return False def isDone(checkpoint, jobType): test = jobType+'=done' try: f = open(checkpoint, "r") except: return False try: for line in f: if line.rstrip() == test: return True finally: f.close() return False def writeStatusIfMine(jobID, status): if isStatusMine(jobID): writeStatus(jobID, status) else: msg("Not overwriting status file, it doesn't belong to me.") def msg(*params): print " ".join(params) sys.stdout.flush() #--------------------------------------------------------------------------------- hostname = socket.gethostname() myPid = os.getpid() msg("queueSlave on %s %d" % (hostname, myPid)) queueHost = sys.argv[1] queuePort = int(sys.argv[2]) baseDir = sys.argv[3] edition = sys.argv[4] siteDir = baseDir+"/wikipedia" logDir = baseDir+"/logs" jobDir = baseDir+"/jobs" checkpointDir = baseDir+"/checkpoints" try: os.makedirs(logDir) except: pass try: os.makedirs(jobDir) except: pass try: os.makedirs(checkpointDir) except: pass queueSock = socket.socket() queueSock.connect((queueHost, queuePort)) queueFile = queueSock.makefile() os.chdir("/home/wikipedia/common/php-1.5/maintenance") waiting = False dataRegex = re.compile("data (\w+) ([a-z_-]+) (\w+) (\d+/\d+)") # Loop until the parent exits while (os.getppid() > 1): queueFile.write("deq\n") queueFile.flush() s = queueFile.readline() m = dataRegex.match(s) if m != None: waiting = False jobID = m.group(1) wiki = m.group(2) type = m.group(3) slice = m.group(4) lang = wiki.replace( 'wiki', '' ) dest = siteDir+"/"+lang+"-new" jobString = wiki+"_" + type + "_" + slice.replace( '/', '_' ) outputFile = logDir+"/"+jobString checkpoint = checkpointDir+"/"+jobString if type == "articles": writeStatus(jobID, 'running') msg(wiki + ' articles ' + slice) dumpHTML(outputFile, wiki,"--no-shared-desc", #"--force-copy", "--image-snapshot", "--interlang","-d",dest,"--slice",slice, "--checkpoint",checkpoint,"--no-overwrite") if isDone(checkpoint, 'everything'): msg("Done") writeStatusIfMine(jobID, 'done') else: msg("Terminated, unfinished") writeStatusIfMine(jobID, 'terminated') elif type == "shared": writeStatus(jobID, 'running') msg(wiki + ' shared ' + slice) dumpHTML(outputFile, wiki,"--shared-desc", #"--force-copy", "--image-snapshot", "--interlang","-d",dest,"--slice",slice, "--checkpoint",checkpoint,"--no-overwrite") if isDone(checkpoint, 'shared image'): msg("Done") writeStatusIfMine(jobID, 'done') else: msg("Terminated, unfinished") writeStatusIfMine(jobID, 'terminated') elif type == "finish": writeStatus(jobID, 'running') finishWiki(outputFile, lang) writeStatusIfMine(jobID, 'done') else: if not waiting: msg("Waiting...") waiting = True time.sleep(5)