audit_report: Use concurrent.futures for parallelization.
This is basically a pure maintainability change: concurrent.futures is the nicest API that's available in both Python 3.6 and 3.7, and our other tools are using it.
This commit is contained in:
		
							parent
							
								
									ad96fdfa90
								
							
						
					
					
						commit
						d9360f1cea
					
				
					 1 changed files with 5 additions and 5 deletions
				
			
		| 
						 | 
					@ -15,9 +15,9 @@
 | 
				
			||||||
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
					# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import argparse
 | 
					import argparse
 | 
				
			||||||
 | 
					import concurrent.futures as futmod
 | 
				
			||||||
import datetime
 | 
					import datetime
 | 
				
			||||||
import logging
 | 
					import logging
 | 
				
			||||||
import multiprocessing
 | 
					 | 
				
			||||||
import os
 | 
					import os
 | 
				
			||||||
import runpy
 | 
					import runpy
 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
| 
						 | 
					@ -198,10 +198,10 @@ def main(arglist: Optional[Sequence[str]]=None,
 | 
				
			||||||
        logger.critical("no books available to load")
 | 
					        logger.critical("no books available to load")
 | 
				
			||||||
        return os.EX_NOINPUT
 | 
					        return os.EX_NOINPUT
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    with multiprocessing.Pool(args.jobs, maxtasksperchild=1) as pool:
 | 
					    with futmod.ProcessPoolExecutor(args.jobs) as pool:
 | 
				
			||||||
        logger.debug("%s: process pool ready with %s workers", now_s(), args.jobs)
 | 
					        logger.debug("%s: process pool ready with %s workers", now_s(), args.jobs)
 | 
				
			||||||
        fy_paths = books._iter_fy_books(fy.range(args.audit_year - 1, args.end_date))
 | 
					        fy_paths = books._iter_fy_books(fy.range(args.audit_year - 1, args.end_date))
 | 
				
			||||||
        check_results = pool.imap_unordered(bean_check, fy_paths)
 | 
					        check_results = pool.map(bean_check, fy_paths)
 | 
				
			||||||
        if all(exitcode == 0 for exitcode in check_results):
 | 
					        if all(exitcode == 0 for exitcode in check_results):
 | 
				
			||||||
            logger.debug("%s: bean-check passed", now_s())
 | 
					            logger.debug("%s: bean-check passed", now_s())
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
| 
						 | 
					@ -214,10 +214,10 @@ def main(arglist: Optional[Sequence[str]]=None,
 | 
				
			||||||
                return os.EX_DATAERR
 | 
					                return os.EX_DATAERR
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        report_results = [
 | 
					        report_results = [
 | 
				
			||||||
            pool.apply_async(report_func, (arglist,), {'config': config})
 | 
					            pool.submit(report_func, arglist, config=config)
 | 
				
			||||||
            for report_func, arglist in reports
 | 
					            for report_func, arglist in reports
 | 
				
			||||||
        ]
 | 
					        ]
 | 
				
			||||||
        report_errors = [res.get() for res in report_results if res.get() != 0]
 | 
					        report_errors = [res.result() for res in report_results if res.result() != 0]
 | 
				
			||||||
        if not report_errors:
 | 
					        if not report_errors:
 | 
				
			||||||
            logger.debug("%s: all reports generated", now_s())
 | 
					            logger.debug("%s: all reports generated", now_s())
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		
		Reference in a new issue