How can I limit the Max Active Jobs per workflow for one FFastrans.

Questions and answers on how to get the most out of FFAStrans
Post Reply
JohnB1
Posts: 35
Joined: Thu Feb 15, 2018 12:42 pm

How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by JohnB1 »

Hi Guys,

I was wondering if we can set Max Active Jobs per workflow in one FFastrans instance?

For Example:

If I have workflow A and workflow B and I want workflow A to process 1 file at a time and workflow B to process 8 files at a time.

Is it possible to do this?

I'd really appreciate any help.

Thanks Again,
John
momocampo
Posts: 594
Joined: Thu Jun 08, 2017 12:36 pm
Location: France-Paris

Re: How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by momocampo »

Hello John,

I don't think it is possible like that. You can use the "priority" in workflow properties but I think it's the only setting you can get.
So, set low for one Wf and high for another.
cheers.

Benjamin
emcodem
Posts: 1749
Joined: Wed Sep 19, 2018 8:11 am

Re: How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by emcodem »

Hey there :-)
it is possible using the API but it requires some lines of code. One could count the running jobs every 5 seconds and pause/resume jobs that need to be queued. Anyway, it would just be a workaround beacuse jobs are paused instead of "queued".
If you are interested i could come up with something that uses the scheduler of my webinterface for it.
emcodem, wrapping since 2009 you got the rhyme?
JohnB1
Posts: 35
Joined: Thu Feb 15, 2018 12:42 pm

Re: How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by JohnB1 »

Thanks for your response guys.

@emcodem that's great if you could come up something that'd be an immense help.

Let me know when you do.

Thanks Again,
John
emcodem
Posts: 1749
Joined: Wed Sep 19, 2018 8:11 am

Re: How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by emcodem »

No Problem Jhon :-)

OK, the idea of this is that it looks every minute if there are jobs to be controlled. This means currently jobs would start processing between 0 and 60 seconds before they are sent to sleep. I hope your jobs are very long running, then this should suffice. So every 60 seconds, it gets a list of all "active and paused" jobs and checks if it should pause/activate any job.

Let me know if it works for you

How it works is that you download my webinterface (sticky thread) and run it on the same host as ffastrans is running.
-) start server.exe of my webinterface (or better install_service.bat but for testing, starting server.exe should work)
-) open http://localhost:3002/
-) open the scheduler on the interface (the calendar symbol on the left menu)
-) Hit the "New" button to create a new job and set the frequency to "Minute" - this makes it run every minute
-) hit the "Edit Condition" button, delete all content and insert this stuff here:
-) look at the first 4 lines of the code, this is where you configure which workflow can run how often in paralell

Code: Select all

var jobQueue = {}
//USER CONFIGURATION: COPY THIS LINE FOR EACH WORKFLOW YOU LIKE TO CONTROL, EDIT ONLY WORKFLOWNAME AND max_jobs number
jobQueue["workflowname"] = {"max_jobs":1,"active":[],"paused":[]}
jobQueue["anotherworkflowname"] = {"max_jobs":8,"active":[],"paused":[]}


//SCRIPT - DO NOT MODIFY IF YOU DON'T KNOW WHAT YOU DO
var getjobsurl = 'http://localhost:65445/api/json/v1/jobs';
const http = require('http');

//get list of jobs
http.get(getjobsurl, (resp) => {
  let data = '';
  resp.on('data', (chunk) => {
    data += chunk;
  });
  resp.on('end', () => {
    //process jobs
     processJobQue(data);
     pauseResumeJobs()
  });
}).on("error", (err) => {
  console.log("Error: " + err.message);
});


function processJobQue(jobData){
  var jobData = JSON.parse(jobData);
  for(i=0;i<jobData.jobs.length;i++){
    if (jobQueue[
        jobData.jobs[i]["wf_name"]]){
    	jobQueue[jobData.jobs[i]["wf_name"]]["active"].push( jobData.jobs[i] );
    }
  }
  console.log(jobQueue)
}

function pauseResumeJobs(){
    
  	//foreach workflow, count paused and active jobs
    for (key in jobQueue) {
        //if there are paused jobs and less jobs than configured, do nothing
        if (countPaused(jobQueue[key]["active"]) == 0 && jobQueue[key]["active"].length < jobQueue[key]['max_jobs']){
            continue;
        }
        
      	var activeCount = jobQueue[key]["active"].length; 
        var max = jobQueue[key]['max_jobs'];
      	for (j=max;j<activeCount;j++){
             //we assume that jobs are sorted by date already
          	if (!jobQueue[key]["active"][j]){
            	continue;//not ehough jobs
            }
             var jobid = jobQueue[key]["active"][j]["job_id"];
             jobCommand(jobid,"Pause")
          }//activate jobs 
      
      	for (j=0;j<max;j++){
          	
             //we assume that jobs are sorted by date already
          	if (!jobQueue[key]["active"][j]){
            	continue;//not ehough jobs
            }
             var jobid = jobQueue[key]["active"][j]["job_id"];
             jobCommand(jobid,"resume")
          }//activate jobs
      
  
    };
   
}

//send api command to ffastrans server
function jobCommand(jobId,command){
  console.log("Sending "+command+ " to job " +jobId)
  var body = {};
  body["action"] = command;
  body["split"] = 1;
  const options = {
    hostname: 'localhost',
    port: 65445,
    path: '/api/json/v1/jobs/' + jobId,
    method: 'PUT',
    headers: {
      'Content-Type': 'application/json',
      'Content-Length': JSON.stringify(body).length
    }
  }
  const req = http.request(options, (res) => {
    console.log(`statusCode: ${res.statusCode}`)
    res.on('data', (d) => {
      console.log("Success:" + d)
    })
  })
  req.on('error', (error) => {
    console.error(error)
  })
  req.write(JSON.stringify(body))
  req.end()

}

function countPaused(jobArray){
	var running = 0;
    for(i=0;i<jobArray.length;i++){
      	//console.log( jobArray[i])
    	if (jobArray[i].splits[0]['status'].indexOf('Paused')!=-1){
        	
        }else{
        	running ++;
        }
    }
  return running;
}

LIMITATIONS:
In case a user wants to pause one active job and activate an older job (reprioritize), the script would override in the next minute and again activate the oldest jobs and pause the youngest ones. Let me know if you need that functionality.
Also, it currently only works if your workflows only have one branch with transcoding. To workaround quick and dirty, you could add a "hold" processor and set it to sleep 70 seconds before your workflow breaks up into multiple branches.
emcodem, wrapping since 2009 you got the rhyme?
Tomical
Posts: 5
Joined: Wed Apr 08, 2020 10:39 am

Re: How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by Tomical »

Hello,

I would like to know the method to limit the number of jobs per workflow.
I put my message here so as not to recreate a topic. I tested via the webinterface but it doesn't work. I think that some lines have changed since 2018. Is it possible to have another method or to have the right lines in the code ?

Thank you,
Thomas
emcodem
Posts: 1749
Joined: Wed Sep 19, 2018 8:11 am

Re: How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by emcodem »

Aye @Tomical ,

sure, no problem. Here the same as above but compatible to ffastrans > 1.0
Note that only the first few lines are really interesting for you. In my case, i just control the workflow "Rewrap_OPAtom".
If you want to control more workflows, uncomment the line with "anotherworkflowname"... you can add as many lines with workflow names as you wish.
Let me know if it works for you

Code: Select all

var jobQueue = {}
//USER CONFIGURATION: COPY THIS LINE FOR EACH WORKFLOW YOU LIKE TO CONTROL, EDIT ONLY WORKFLOWNAME AND max_jobs number
jobQueue["Rewrap_OPAtom"] = {"max_jobs":1,"active":[],"paused":[]}
//jobQueue["anotherworkflowname"] = {"max_jobs":8,"active":[],"paused":[]}


//SCRIPT - DO NOT MODIFY IF YOU DON'T KNOW WHAT YOU DO
var getjobsurl = 'http://localhost:65445/api/json/v2/jobs';
const http = require('http');

//get list of jobs
http.get(getjobsurl, (resp) => {
  let data = '';
  resp.on('data', (chunk) => {
    data += chunk;
  });
  resp.on('end', () => {
    //process jobs
     
     processJobQue(data);
     pauseResumeJobs()
  });
}).on("error", (err) => {
  console.log("Error: " + err.message);
});


function processJobQue(jobData){
  var jobData = JSON.parse(jobData);
  for(i=0;i<jobData.jobs.length;i++){
    console.log(jobData.jobs[i]["workflow"])
    if (jobQueue[
        jobData.jobs[i]["workflow"]]){
    	jobQueue[jobData.jobs[i]["workflow"]]["active"].push( jobData.jobs[i] );
    }
  }
  console.log(jobQueue)
}

function pauseResumeJobs(){
    
  	//foreach workflow, count paused and active jobs
    for (key in jobQueue) {
        //if there are paused jobs and less jobs than configured, do nothing
        if (countPaused(jobQueue[key]["active"]) == 0 && jobQueue[key]["active"].length < jobQueue[key]['max_jobs']){
            continue;
        }
        
      	var activeCount = jobQueue[key]["active"].length; 
        var max = jobQueue[key]['max_jobs'];
      	for (j=max;j<activeCount;j++){
             //we assume that jobs are sorted by date already
          	if (!jobQueue[key]["active"][j]){
            	continue;//not ehough jobs
            }
             var jobid = jobQueue[key]["active"][j]["job_id"];
          	 //console.log(jobQueue[key]["active"][j])
             jobCommand(jobQueue[key]["active"][j],"Pause")
          }//activate jobs 
      
      	for (j=0;j<max;j++){
          	
             //we assume that jobs are sorted by date already
          	if (!jobQueue[key]["active"][j]){
            	continue;//not ehough jobs
            }
             var jobid = jobQueue[key]["active"][j]["job_id"];
             jobCommand(jobQueue[key]["active"][j],"resume")
          }//activate jobs
      
  
    };
   
}

//send api command to ffastrans server
function jobCommand(job,command){
  
  console.log("Sending "+command+ " to job " +job["job_id"])
  var body = {};
  body["action"] = command;
  body["split_id"] = job["split_id"];
  const options = {
    hostname: 'localhost',
    port: 65445,
    path: '/api/json/v2/jobs/' + job["job_id"],
    method: 'PUT',
    headers: {
      'Content-Type': 'application/json',
      'Content-Length': JSON.stringify(body).length
    }
  }
  const req = http.request(options, (res) => {
    console.log(`statusCode: ${res.statusCode}`)
    res.on('data', (d) => {
      console.log("Success:" + d)
    })
  })
  req.on('error', (error) => {
    console.error(error)
  })
  req.write(JSON.stringify(body))
  req.end()

}

function countPaused(jobArray){
	var running = 0;
    for(i=0;i<jobArray.length;i++){
    	if (jobArray[i]['status'].indexOf('Paused')!=-1){
        	
        }else{
        	running ++;
        }
    }
  return running;
}
emcodem, wrapping since 2009 you got the rhyme?
Tomical
Posts: 5
Joined: Wed Apr 08, 2020 10:39 am

Re: How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by Tomical »

Hey,

Thanks a lot, it works but my problem is not solved ='(

There are 2 workflows : the first one converts subtitle files.
The second one is a video watchfolder in which I use a node to wait for the converted subtitle file followed by a node to make a hardcode.

Imagine that I configured FFASTRANS for a maximum of 4 active jobs. If I have 4 videos arriving in the watchfolder and whose subtitles are not available, the other workflow which converts the subtitle file cannot work!

I don't know if it's clear =)

If you have an idea to work around the problem, it would be great !

br
Tomical
Posts: 5
Joined: Wed Apr 08, 2020 10:39 am

Re: How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by Tomical »

It's ok, I find a solution.
I've merged the two workflows, it's a better solution
emcodem
Posts: 1749
Joined: Wed Sep 19, 2018 8:11 am

Re: How can I limit the Max Active Jobs per workflow for one FFastrans.

Post by emcodem »

Yeah, merging the workflows sounds very reasonable :-)
emcodem, wrapping since 2009 you got the rhyme?
Post Reply