
    `f                     t   d Z ddlZddlZddlZddlZddlZddlZddlmZm	Z	m
Z
 ddlmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZmZ ddlm Z m!Z!m"Z"m#Z#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*mZ+ ddl,m-Z- ddl.m/Z/ ddl0m1Z1 ddl2m3Z3 ddl4m5Z5 ddl6m7Z7 ddl8m9Z: 	 ddl;m<Z= dZ>n# e?$ r ej<        fdZ=dZ>efdZY nw xY wdZ@ e7eA          ZBeBjC        eBjD        cZCZD eEejF        ejG        h          ZHdZIdZJdZKd ZLeLeLeKeKeLd!ZMd" eMN                                D             ZO e
d#d$          ZPd% ZQd& ZR eSed'          r!ddddejT        ejU        ejV        ejW        fd(ZXnd2d)ZXddddeXfd*ZYd+ ZZ G d, d-ej[                  Z[ G d. d/ej\                  Z\ G d0 d1ej]                  Z^dS )3a  Version of multiprocessing.Pool using Async I/O.

.. note::

    This module will be moved soon, so don't use it directly.

This is a non-blocking version of :class:`multiprocessing.Pool`.

This code deals with three major challenges:

#. Starting up child processes and keeping them running.
#. Sending jobs to the processes and receiving results back.
#. Safely shutting down this system.
    N)Counterdeque
namedtuple)BytesIO)Integral)HIGHEST_PROTOCOL)packunpackunpack_from)sleep)WeakValueDictionaryref)pool)
isblockingsetblocking)ACKNACKRUN	TERMINATEWorkersJoined)_SimpleQueue)ERRWRITE)pickle)SELECT_BAD_FD)fxrange)promise)worker_before_create_process)noop)
get_logger)state)readTc                 r     || |          }t          |          }|dk    r|                    |           |S Nr   )lenwrite)fdbufsizer"   chunkns         M/var/www/html/env/lib/python3.11/site-packages/celery/concurrency/asynpool.py__read__r-   5   s<    RJJ66IIe    Fc                 >     || |                                           S N)getvalue)fmtiobufr
   s      r,   r   r   =   s    vc5>>++,,,r.   )AsynPool   g      @      )Ndefaultfastfcfsfairc                     i | ]\  }}||	S  r=   ).0kvs      r,   
<dictcomp>rA   W   s    DDD41a!QDDDr.   Ack)idr'   payloadc                 2    t          j        |           dk    S )z(Return true if generator is not started.GEN_CREATED)inspectgetgeneratorstate)gens    r,   gen_not_startedrJ   \   s    $S))]::r.   c                 H    	 | j         } |            S # t          $ r Y d S w xY wr0   )_writerAttributeError)jobwriters     r,   _get_job_writerrP   a   s?     vxx    s    
!!pollc                 4    |            }|j         | rfd| D              |rfd|D              |rfd|D              t                      t                      }
}	|r|dk     rdnt          |dz            }|                    |          }|D ]|\  }}t	          |t
                    s|                                }|z  r|	                    |           |z  r|
                    |           |z  r|	                    |           }|	|
dfS )Nc                 (    g | ]} |          S r=   r=   )r>   r'   POLLINregisters     r,   
<listcomp>z_select_imp.<locals>.<listcomp>r   s%    444bXXb&!!444r.   c                 (    g | ]} |          S r=   r=   )r>   r'   POLLOUTrU   s     r,   rV   z_select_imp.<locals>.<listcomp>t   s%    555rXXb'""555r.   c                 (    g | ]} |          S r=   r=   )r>   r'   POLLERRrU   s     r,   rV   z_select_imp.<locals>.<listcomp>v   s%    111rXXb'""111r.   r   g     @@)rU   setroundrQ   
isinstancer   filenoadd)readerswriterserrtimeoutrQ   rT   rX   rZ   pollerRWeventsr'   eventrU   s        ```      @r,   _select_impri   k   s\    ? 	544444G4444 	655555W5555 	211111S1111uucee1H7Q;;!!E'C-4H4HW%% 	 	IBb(++ !YY[[v~ b			w b			w b			!Qwr.   c                     t          j         | |||          \  }}}|r,t          t          |          t          |          z            }||dfS r$   )selectlistr[   )r`   ra   rb   rc   rwes          r,   ri   ri      sM    -#w??1a 	&SVVc!ff_%%A!Qwr.   c                    | t                      n| } |t                      n|}|t                      n|}	  || |||          S # t          $ r}|j        }|t          j        k    r"t                      t                      dfcY d}~S |t          v r| |z  |z  D ]}	 t          j        |gg g d           # t          $ rZ}|j        }|t          vr |                     |           |                    |           |                    |           Y d}~{d}~ww xY wt                      t                      dfcY d}~S  d}~ww xY w)a<  Simple wrapper to :class:`~select.select`, using :`~select.poll`.

    Arguments:
        readers (Set[Fd]): Set of reader fds to test if readable.
        writers (Set[Fd]): Set of writer fds to test if writable.
        err (Set[Fd]): Set of fds to test for error condition.

    All fd sets passed must be mutable as this function
    will remove non-working fds from them, this also means
    the caller must make sure there are still fds in the sets
    before calling us again.

    Returns:
        Tuple[Set, Set, Set]: of ``(readable, writable, again)``, where
        ``readable`` is a set of fds that have data available for read,
        ``writable`` is a set of fds that's ready to be written to
        and ``again`` is a flag that if set means the caller must
        throw away the result and call us again.
    Nr6   r   )r[   OSErrorerrnoEINTRr   rk   discard)r`   ra   rb   rc   rQ   exc_errnor'   s           r,   _selectrw      s   * ceeeGGceeeGG;#%%%CCtGWc7333   U[  55#%%?""""""}$$'#- 
$ 
$	$M2$B2222 $ $ $ YF]22OOB'''OOB'''KKOOOOOOOO$ 55#%%?""""""'sZ   A 
E3E E	E B65E 6
D ADE DE 9E?E  Ec                   	 	fd}g }| D ]d	 |            |}}	  |	g|R i | # t           t          f$ r5 t                              d	d           |                    	           Y aw xY w|rn|D ]m		 t          |d          r|                    	           n|                    	d           @# t          $ r t                              d	|           Y hw xY wdS dS )a  Apply hub method to fds in iter, remove from list if failure.

    Some file descriptors may become stale through OS reasons
    or possibly other reasons, so safely manage our lists of FDs.
    :param fds_iter: the file descriptors to iterate and apply hub_method
    :param source_data: data source to remove FD if it renders OSError
    :param hub_method: the method to call with with each fd and kwargs
    :*args to pass through to the hub_method;
    with a special syntax string '*fd*' represents a substitution
    for the current fd object in the iteration (for some callers).
    :**kwargs to pass through to the hub method (no substitutions needed)
    c                  0    } d| v rfdD             } | S )N*fd*c                 $    g | ]}|d k    rn|S )rz   r=   )r>   argr'   s     r,   rV   zTiterate_file_descriptors_safely.<locals>._meta_fd_argument_maker.<locals>.<listcomp>   s%    FFF#sf}}#FFFr.   r=   )	call_argsargsr'   s    r,   _meta_fd_argument_makerz@iterate_file_descriptors_safely.<locals>._meta_fd_argument_maker   s2    	YFFFFFFFIr.   z)Encountered OSError when accessing fd %s Texc_inforemoveNz*ValueError trying to invalidate %s from %s)	rq   FileNotFoundErrorloggerwarningappendhasattrr   pop
ValueError)
fds_itersource_data
hub_methodr~   kwargsr   	stale_fdshub_args
hub_kwargsr'   s
      `     @r,   iterate_file_descriptors_safelyr      s         I 	! 	!6688&*	!Jr3H333
3333*+ 	! 	! 	!NN;T  # # # R     		!  	0 	0 	0B0;11 .&&r****OOB--- 0 0 0K!;0 0 0 0 00	0 	0	0 	0s"   )AA/.A/9<B66&CCc                       e Zd ZdZd ZdS )WorkerzPool worker process.c                 J    | j                             t          |ff           d S r0   )outqput	WORKER_UP)selfpids     r,   on_loop_startzWorker.on_loop_start   s%     		y3&)*****r.   N)__name__
__module____qualname____doc__r   r=   r.   r,   r   r      s)        + + + + +r.   r   c                   \     e Zd ZdZ fdZeeeee	j
        fdZd Zd Zd Zd Zd Z xZS )	ResultHandlerz)Handles messages from the pool processes.c                     |                     d          | _        |                     d          | _         t                      j        |i | | j        | j        t          <   d S )Nfileno_to_outqon_process_alive)r   r   r   super__init__state_handlersr   )r   r~   r   	__class__s      r,   r   zResultHandler.__init__   s_    $jj)9:: &

+= > >$)&))))-)>I&&&r.   c	              #     K   dx}	}
|rt          d          }t          |          }n |            x}}|	dk     rx	  |||r
||	d          n|d|	z
            }|dk    r|	rt          d          nt                      |	|z  }	n*# t          $ r}|j        t
          vr d V  Y d }~nd }~ww xY w|	dk     x |d|          \  }|rt          |          }t          |          }n |            x}}|
|k     rx	  |||r
||
d          n|||
z
            }|dk    r|
rt          d          nt                      |
|z  }
n*# t          $ r}|j        t
          vr d V  Y d }~nd }~ww xY w|
|k     x ||| j        |           |r | ||                    }n |                    d            ||          }|r ||           d S d S )Nr   r7   zEnd of file during messagez>i)	bytearray
memoryviewrq   EOFErrorrr   UNAVAILhandle_eventseek)r   
add_readerr'   callbackr-   
readcanbufr   r   loadHrBrr(   bufvr+   ru   	body_sizemessages                    r,   _recv_messagezResultHandler._recv_message   s      R 	#A,,Cc??DD "C$ 1ffHZ9RSS		T1r6  66DF +7#?@@@ (

,a    9G++ 1ff ![t,,
	 	#I&&Cc??DD "C$9nnHZ9RSS		T9r>  66DF +7#?@@@ (

,a    9G++ 9nn 	
2t("--- 	!d774==))GGIIaLLLd4jjG 	HW	 	s/   B 
B)B$$B)4D; ;
E"EE"c                 h    | j         | j        |j        |j        | j        fd}|S )z3Coroutine reading messages from the pool processes.c                    	 |           n# t           $ r  |           cY S w xY w |           }	 t          |            | |           d S # t          $ r Y d S t          t          f$ r  |            Y d S w xY wr0   )KeyErrornextStopIterationrq   r   )r^   itr   r   on_state_changerecv_messageremove_readers     r,   on_result_readablez>ResultHandler._make_process_result.<locals>.on_result_readable?  s    -v&&& - - -$}V,,,,,-j&/BBB'R 
62&&&&& !   X& & & &f%%%%%%&s#    $$A 
A>A>=A>)r   r   r   r   r   )r   hubr   r   r   r   r   r   s      @@@@@r,   _make_process_resultz"ResultHandler._make_process_result7  sc    ,.^
))	' 	' 	' 	' 	' 	' 	' 	' 	' "!r.   c                 :    |                      |          | _        d S r0   )r   r   )r   r   s     r,   register_with_event_loopz&ResultHandler.register_with_event_loopO  s     55c::r.   c                      t          d          )NzNot registered with event loop)RuntimeError)r   r~   s     r,   r   zResultHandler.handle_eventR  s     ;<<<r.   c           	         | j         }| j        }| j        }| j        }| j        }t          |          }|r|r| j        t          k    r|
 |             t                      }|D ]T}t          |g| j        | j	        |j
        ||           	  |d           4# t          $ r t          d           Y  d S w xY w|                    |           |r|r| j        t          k    d S d S d S d S d S d S )NT)shutdownz&result handler: all workers terminated)cachecheck_timeoutsr   r   join_exited_workersr[   _stater   r   _flush_outqueuer_   r   debugdifference_update)	r   r   r   r   r   r   	outqueuespending_remove_fdr'   s	            r,   on_stop_not_startedz!ResultHandler.on_stop_not_startedW  sc   
,,."6 ''	 	;	 	;dkY&>&>)    # 	 	/D$-t/C%)>?  ''66666$   BCCCFFF ''(9:::!  	;	 	;dkY&>&>&>&> 	; 	; 	; 	;&>&> 	; 	; 	; 	;s   
BB54B5c                    	 ||         }n# t           $ r  ||          cY S w xY w|j        j        }	 t          |d           n# t          $ r  ||          cY S w xY w	 |                    d          r|                                }nd }t          d           |r ||           nP# t          t          f$ r<  ||          cY 	 t          |d           S # t          $ r  ||          cY c S w xY ww xY w	 t          |d           d S # t          $ r  ||          cY S w xY w# 	 t          |d           w # t          $ r  ||          cY c cY S w xY wxY w)Nr6   r         ?)	r   r   _readerr   rq   rQ   recvr   r   )r   r'   r   process_indexr   procreadertasks           r,   r   zResultHandler._flush_outqueues  s	   	 $DD 	 	 	 6"::		 "	"""" 	 	 	6"::		"{{1~~ {{}}c


  &%%%	 " 	 	 	6"::
"FA&&&& " " "vbzz!!!!!"	"FA&&&&& " " "vbzz!!!""FA&&&& " " "vbzz!!!!!!!"s    ##A AA ;B) D' )C6D' CC21C25C66D' :D D$#D$'E)D:9E:EEEE)r   r   r   r   r   r-   r   r   r   _pickler   r   r   r   r   r   r   __classcell__r   s   @r,   r   r      s        33? ? ? ? ?  (J%;"<7 7 7 7r" " "0; ; ;= = =
; ; ;8" " " " " " "r.   r   c                   |    e Zd ZdZeZeZdZ fdZ	 	 d) fd	Z fdZ	d Z
d Zd	 Zd
 Zd Zd Zd Zd Zd Zeej        efdZd Zd Zd Zd Zd Zd Zd Zd Zd Z d Z!d Z"e#d             Z$ fdZ%d Z&d Z'd  Z(d! Z)d" Z*d# Z+ej        eefd$Z,e-d%             Z.d& Z/e-d'             Z0e1d(             Z2 xZ3S )*r4   zAsyncIO Pool (no threads).Fc                 X    t                                          |          }d|_        |S )NF)r   WorkerProcessdead)r   workerr   s     r,   r   zAsynPool.WorkerProcess  s&    &&v..r.   Nc                 H    t                               ||           _        |                                 n|}| _         fdt          |          D              _        i  _        i  _        i  _	        |t          n| _        t                       _        t                       _        t                       _        t                       _        t                       _         j        j         _        t)                       _        t-                       _         t1                      j        |g|R i |  j        D ] }| j        |j        <   | j	        |j        <   !t;           j        dt>                     _         t;           j        dt>                     _!        d S )Nc                 :    i | ]}                                 d S r0   create_process_queuesr>   _r   s     r,   rA   z%AsynPool.__init__.<locals>.<dictcomp>  s4     
 
 
34D&&(($
 
 
r.   on_soft_timeouton_hard_timeout)"SCHED_STRATEGIESgetsched_strategy	cpu_countsynackrange_queues_fileno_to_inq_fileno_to_outq_fileno_to_synqPROC_ALIVE_TIMEOUT_proc_alive_timeoutr[   _waiting_to_start_all_inqueues_active_writes_active_writers_busy_workersrt   _mark_worker_as_availabler   outbound_bufferr   write_statsr   r   _pooloutqR_fdsynqW_fdgetattr_timeout_handlerr   r   r   )	r   	processesr   r   proc_alive_timeoutr~   r   r   r   s	   `       r,   r   zAsynPool.__init__  s    /22>3AC C(1(9DNN$$$y	
 
 
 
8=i8H8H
 
 

 !!! #5"<# 	  "% !UU "ee  #uu !UU)-);)C&  %ww"994T444V444J 	7 	7D 37D /26D //&!#4d 
  
  '!#4d 
  
r.   c                     t          j        |            t          j                     t	                                          |          S )N)sender)r   sendgccollectr   _create_worker_process)r   ir   s     r,   r  zAsynPool._create_worker_process  s:    $)6666

ww--a000r.   c                 Z    |                      ||           |                                  d S r0   )_untrack_child_processmaintain_pool)r   r   r   s      r,   _event_process_exitzAsynPool._event_process_exit  s0    ##D#...r.   c                     	 |j         }n5# t          $ r( t          j        |j        j                  x}|_         Y nw xY wt          |gd|j        | j        ||           dS )z4Helper method determines appropriate fd for process.N)	_sentinel_pollrM   osdup_popensentinelr   r   r  r   r   r   r'   s       r,   _track_child_processzAsynPool._track_child_process  s    	D$BB 	D 	D 	D
 (*vdk.B'C'CCB$$$	D 	(D$$c4	1 	1 	1 	1 	1s   
 /<<c                     |j         :|j         d c}|_         |                    |           t          j        |           d S d S r0   )r  r   r  closer  s       r,   r  zAsynPool._untrack_child_process  sE    *&*&94#B#JJrNNNHRLLLLL +*r.   c                      j                                         j         j         _                                                                                                         fd j        D              t           j	         j	        j
         j        d            j                                        D ]\  }}                    ||            j        s(j                             j                   d _        dS dS )z4Register the async pool with the current event loop.c                 <    g | ]}                     |          S r=   )r  )r>   rn   r   r   s     r,   rV   z5AsynPool.register_with_event_loop.<locals>.<listcomp>  s)    ???q	"	"1c	*	*???r.   rz   TN)_result_handlerr   r   handle_result_event_create_timelimit_handlers_create_process_handlers_create_write_handlersr  r   r   r   timersitemscall_repeatedly_registered_with_event_loopon_tickr_   on_poll_start)r   r   handlerintervals   ``  r,   r   z!AsynPool.register_with_event_loop  s+   55c:::#'#7#D '',,,%%c***##C((( 	@????DJ???? 	( $"6$f	. 	. 	. "&!2!2!4!4 	3 	3GX'2222 / 	4KOOD.////3D,,,	4 	4r.   c                      j         t                      x _         fd}| _        fd _        fd}| _        dS )z.Create handlers used to implement time limits.c                     |r$ |j         | j        ||          | j        <   d S |r! |j        | j                  | j        <   d S d S r0   )_on_soft_timeout_job_on_hard_timeout)re   softhard
call_laterr   r   trefss      r,   on_timeout_setz;AsynPool._create_timelimit_handlers.<locals>.on_timeout_set  ss      *
$/tS! !af   *
$/! !af r.   c                     	                      |           }|                                 ~d S # t          t          f$ r Y d S w xY wr0   )r   cancelr   rM   )rN   trefr4  s     r,   _discard_trefz:AsynPool._create_timelimit_handlers.<locals>._discard_tref)  sR    yy~~DDn-   s   */ AAc                 (     | j                    d S r0   )r/  )re   r9  s    r,   on_timeout_cancelz>AsynPool._create_timelimit_handlers.<locals>.on_timeout_cancel2  s    M!&!!!!!r.   N)r3  r   _tref_for_idr5  r9  r;  )r   r   r5  r;  r9  r3  r4  s   ``  @@@r,   r!  z#AsynPool._create_timelimit_handlers  s    ^
$7$9$99!	 	 	 	 	 	 	 	 -	 	 	 	 	 +	" 	" 	" 	" 	"!2r.   c                 ,   |r'|                     ||z
  | j        |          | j        |<   	 | j        |         }|                     |           n# t
          $ r Y nw xY w|s|                     |           d S d S # |s|                     |           w w xY wr0   )r3  r0  r<  _cacher   r   r9  )r   rN   r1  r2  r   results         r,   r.  zAsynPool._on_soft_timeout6  s     	%(^^tT2C& &Dc"		([%F   ((((  	 	 	D	
  (""3'''''( (4 (""3''''(s(   A A9 
AA9 AA9 9Bc                     	 | j         |         }|                     |           n# t          $ r Y nw xY w|                     |           d S # |                     |           w xY wr0   )r>  r   r   r9  )r   rN   r?  s      r,   r0  zAsynPool._on_hard_timeoutG  s    	$[%F   ((((  	 	 	D	 s#####Ds####s!   % A 
2A 2A A#c                 0    |                      |           d S r0   )r   )r   rN   r  objinqW_fds        r,   on_job_readyzAsynPool.on_job_readyS  s    &&w/////r.   c                 :   	
 j         j        j        c j         j         j        	 j        
 j         j         j	         j
         j        
fd
 fd}| _        dd	
 fd}| _        dS )z/Create handlers called on process up/down, etc.c                     |             } | p|                                  r^| v r\| j        v sJ | j                 | u sJ | j        j        v sJ t          d|            t	          j        | j        d           d S d S d S d S )Nz(Timed out waiting for UP message from %r	   )	_is_aliver  r`   errorr  killr   )r   r   r   waiting_to_starts    r,   verify_process_alivez?AsynPool._create_process_handlers.<locals>.verify_process_alivee  s    466D T^^%5%5 ,,,}6666%dm4<<<<}3333@$GGG!$$$$$ !   ,,r.   c                    | j         }                                D ]>}|j        r|j        j         |k    r| |_        |j        r|j        j         |k    r| |_        ?| | j        <                       |            t          | j        j                  rJ  | j        | j                   
	                    |            
                    j        	t          |                      dS )z"Called when a process has started.N)rC  values	_write_to_scheduled_forr  r  r   r   r   r_   r3  r   r   )r   infdrN   r   r   r   r   r   r   rL  rK  s      r,   on_process_upz8AsynPool._create_process_handlers.<locals>.on_process_upo  s    <D||~~ . .= )S]%:d%B%B$(CM% .#*<*D*L*L)-C&,0N4=) %%dC000!$)"344444 Jt}&94=III  &&&NN(*>D		    r.   Nc                     	 |                                  }n# t          $ r Y d S w xY w	 ||         |u r|                    |d             ||           | ||           n# t          $ r Y nw xY w|S r0   )r^   rq   r   r   )rB  r   index
remove_funr   r'   s         r,   _remove_from_indexz=AsynPool._create_process_handlers.<locals>._remove_from_index  s    ZZ\\   	!9$$IIb$''' 
2'HRLLL     Is    
%% A" "
A/.A/c                    t          | dd          rdS  	|             | j        j        | 
           | j        r | j        j        |             | j        j        | j                  }|r                    |                               |                                |            j                            | j	                    | j        j                    
| j        j                   | j
        r 
| j        j                   | j        r6j                            | j                    
| j        j                   dS dS )z#Called when a worker process exits.r   Nr   )r  r   r   synqrL   inqrt   r  r   rC  synqR_fdr  )r   rZ  rV  all_inqueuesbusy_workersfileno_to_inqr   fileno_to_synqr   process_flush_queuesr   remove_writerr   rK  s     r,   on_process_downz:AsynPool._create_process_handlers.<locals>.on_process_down  s   tVT**   &&&	!4   y ""I%t^]   %$ $}%-  C  *$$S)))''c222$$T***''555M$(*+++M$)+,,,} 1di/000} 1#++DM:::di/000001 1r.   r0   )r   r   ra  r>  r   r   r   r   r   r   r`  r   rR  rb  )r   r   rR  rb  rV  r   r\  r]  r   r^  r   r_  r   r`  r   ra  rL  rK  s   ``  @@@@@@@@@@@@@@r,   r"  z!AsynPool._create_process_handlersV  sW    NC-s/@ 	1
M= )+--)"6#81	% 	% 	% 	% 	% 	% 	%	 	 	 	 	 	 	 	 	 	 	 	8 +	 	 	 	*	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	18  /r.   c                 T    !"#$  j          j         j        j        j        ! j         j         j        } j        j	        j
        j        j        cj        |j        j        |j        # j        j         j        $ j        t$          k    t&          j        "t*          j        t.                               t.          d          t2                               t2          d          i t4          j        f"fd	}| _        fd}| _        fd}| _         _        d!fd	}	|	_        !fd}
|
 _          fd#$fd	 #fd
}| _!        dfd	dS )z6Create handlers used to write data to child processes.)r   c                     | j         	| j        v rK| j        s(|                     d  |                         d            |                     | j                    d S | vr                    |            d S d S r0   )_terminatedcorrelation_id	_accepted_ack_set_terminated
appendleft)rN   _timegetpidoutboundrevoked_taskss     r,   	_put_backz2AsynPool._create_write_handlers.<locals>._put_back  s    *&-77} <HHT5577FFHHd;;;##CO44444 h&&'',,,,, '&r.   c                                 } r#	ot                    t                    k     }n	}|r$t          | d t          t          z  d           d S t          |            d S )NT)consolidate)r%   r   r   r   )
inactiveadd_condactive_writesr\  r]  diffhub_add
hub_removeis_fair_strategyrm  s
     r,   r)  z6AsynPool._create_write_handlers.<locals>.on_poll_start  s    tM**H   $#ML(9(9C<M<M(M# 8/lG%#+49 9 9 9 9 9 0lJ8 8 8 8 8r.   c                                          |            	 |          |u rB                    | d                                 |                                 |            d S d S # t          $ r Y d S w xY wr0   )rt   r   r   )r'   r   rt  r\  r]  r^  s     r,   on_inqueue_closez9AsynPool._create_write_handlers.<locals>.on_inqueue_close
  s       $$$ $,,!%%b$///!))"--- ((,,,,, -,    s   A
A& &
A43A4Nc                    |sdg}t          |           }t          |          D ]A}| |d         |z           }|dxx         dz  cc<   |v r)r|v r0|vr |           @	              }|j        s	 |         x}|_        n# t          $ r  |           Y zw xY w 
|||          }t          |          |_         |            |            |           	 t          |            ||           # t          $ r Y t          $ r!}|j
        t          j        k    r Y d }~d }~ww xY w# t          $ r            D ]}	 |	           Y  d S w xY wd S )Nr   r6   )r%   r   rg  rP  r   r   rL   r   r   rq   rr   EBADF
IndexError)	ready_fdstotal_write_count	num_readyr   ready_fdrN   r   corru   inqfd
_write_jobrt  
add_writerr\  r]  ru  r^  rw  rx  mark_worker_as_busymark_write_fd_as_activemark_write_gen_as_activepop_messageput_messages             r,   schedule_writesz8AsynPool._create_write_handlers.<locals>.schedule_writes  sN   $ (%&C! II9%% 36 36$%6q%9I%EF!!$$$)$$$},,# L(@(@<//Jx((('6%+--C = 6	% 9Fh8OOD3#5#5' % % % (K,,,$H% )jx==&)#hh00555//999++H5556 III 'Jx5555  - ! ! ! D& & & &"yEK77 %  87777&-6 "    "&m!4!4 * *"
5))))EEE36 36sB   &
D=8BB B &D
D8	D8D33D8=#E%$E%c                      |           }t          |          } d|          } | d         d                   }t          |          t          |          |f|_         	|           d S )Nprotocol>Ir6   r   )r%   r   _payload)
tupbodyr   headerrN   dumpsget_jobr	   r  r  s
        r,   send_jobz1AsynPool._create_write_handlers.<locals>.send_joba  s     5x000DD		IT$	**F'#a&)$$C%f--z$/?/?JCLKr.   c                     t                               d| | j        |           |                                 r|                                                      |                               |           d S )Nz"Process inqueue damaged: %r %r: %r)r   	exceptionexitcoderH  	terminater   ro  )r   r'   rN   ru   r   r   s       r,   on_not_recoveringz:AsynPool._create_write_handlers.<locals>.on_not_recoveringm  sr    4dDM3P P P~~ !   JJrNNNNN3r.   c              3     K   |j         \  }}}d}	 | |_        | j        }dx}}	|dk     rt	 | |||          z  }d}n[# t          $ rN}
t	          |
dd           t
          vr |dz  }|dk    r | |||
           t                      d V  Y d }
~
nd }
~
ww xY w|dk     t|	|k     rt	 |	 |||	          z  }	d}n[# t          $ rN}
t	          |
dd           t
          vr |dz  }|dk    r | |||
           t                      d V  Y d }
~
nd }
~
ww xY w|	|k     t |           | j        xx         dz  cc<                       |            |	                                           d S #  |           | j        xx         dz  cc<                       |            |	                                           w xY w)Nr   r7   rr   r6   d   )
r  rO  send_job_offset	Exceptionr  r   r   rT  rt   rL   )r   r'   rN   r  r  r   errorsr
  HwBwru   rt  rw  r  write_generator_doner   s              r,   r  z3AsynPool._create_write_handlers.<locals>._write_jobu  sl     
 '*l#FD)F*4 $+R1ff#dd62... "# %   "366gEE!!!C<<--dBSAAA"///1 1ff  9nn#dd4nn, "# %   "366gEE!!!C<<--dBSAAA"///1 9nn 
2DJ'''1,'''%%b)))$$S[[]]33333	 
2DJ'''1,'''%%b)))$$S[[]]3333s^   E, = E, 
BABE, BE, %B7 4E, 7
DAD
E, 
D	E, ,AG c                     t          |||                    }t                    } |||          } 
|            	|           |f|_         ||           d S )NrX  )rB   r   r~   )responser   rN   r'   msgr   r  
_write_ackr  r  r  precalcr  s          r,   send_ackz1AsynPool._create_write_handlers.<locals>.send_ack  s     c2wx011C344H*Rx888C$$S)))##B''' FHMJr3r.   c              3   Z  K   |d         \  }}}	 	 |          }n# t           $ r t                      w xY w|j        }dx}}	|dk     rK	 | |||          z  }n4# t          $ r'}
t	          |
dd           t
          vr d V  Y d }
~
nd }
~
ww xY w|dk     K|	|k     rK	 |	 |||	          z  }	n4# t          $ r'}
t	          |
dd           t
          vr d V  Y d }
~
nd }
~
ww xY w|	|k     K|r
 |                                 |            d S # |r
 |                                 |            w xY w)N   r   r7   rr   )r   r   send_syn_offsetr  r  r   rt   )r'   ackr   r  r  r   r   r
  r  r  ru   rt  r_  s              r,   r  z3AsynPool._create_write_handlers.<locals>._write_ack  s      '*!f#FD) **)"-DD * * * (//)* +R1ffdd62...$   "366gEE! 1ff 9nndd4nn,$   "366gEE!	 9nn  HJJJ%%b)))))  HJJJ%%b))))sl    D 4D 	A D 
B
#B D B

D B* )D *
C4CD C	D #D*r0   )"r   r   r   popleftr   r   r   r   r   
differencer  r_   r   rt   r>  __getitem__r   r   SCHED_STRATEGY_FAIRworker_staterevokedr  rl  r   _create_payloadr   timero  r)  rz  rw  consolidate_callback
_quick_putr  )%r   r   r	   r  r  active_writersro  r)  rz  r  r  r  r  r  rt  r  r\  r]  ru  r^  r_  r  rl  rv  rw  rx  r  r  r  r  rm  r  r  r  rn  r  r   s%   `````       @@@@@@@@@@@@@@@@@@@@@@@@@r,   r#  zAsynPool._create_write_handlers  s+    +-'&o)+-)&^
!gsz"/"3#1#5 *.-5+)&.2EE$,,,S$77--dD99; "& 	- 	- 	- 	- 	- 	- 	- 	- #	8 	8 	8 	8 	8 	8 	8 	8 	8 	8 	8 	8$ +
	 
	 
	 
	 
	 
	 
	 
	 !1$F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6 F	6N $3 		 		 		 		 		 		 		 		 		 #	  	  	  	  	  	 1	4 1	4 1	4 1	4 1	4 1	4 1	4 1	4 1	4f		  		  		  		  		  		  		  		  		  		  !%	* %	* %	* %	* %	* %	* %	* %	* %	*r.   c                    | j         t          k    rd S | j        r7| j                                        D ]}|j        s|                                 | j        r| j                                         | 	                                 	 | j         t          k    rt          dddd          }i }| j                                        D ]}t          |          }||||<   | j        s| j                                         n| j        rt          | j                  }|D ]}|j        dk    rXt!          |          rI	 ||         }|                                 n# t$          $ r Y nw xY w| j                            |           e	 ||         }|j        }|                                r|                     ||           |                                 # t$          $ r Y w xY w| j        | 	                                 t-          t/          |                     | j                                         | j                                         | j                                         | j                                         d S # | j                                         | j                                         | j                                         | j                                         w xY w)N{Gz?g?T)
repeatlastr  )r   r   r   r>  rN  rg  _cancelr   clearr  r   r   rP   r   rl   r   rJ   rt   r   rO  rH  _flush_writerr   r   r   r   )r   rN   	intervalsowned_byrO   ra   rI   job_procs           r,   flushzAsynPool.flush  s   ;)##F ; 	"{))++ " "} "KKMMM  	) &&(((5	' {c!!#D#tEEE	 ;--// / /C,S11F)+.(+  +K%%''''. 2"&t';"<"<#* 2 2C # < <$3C$8$8 !=
!2*23-C
 %(KKMMMM	 (0 !) !) !)$(D!)
 !% 4 < <S A A A A	!2*23-C 03}H'/'9'9';'; %J(,(:(:8S(I(I(I$'KKMMMM (0 !) !) !)$(D!)' . 28 &&((($y//*** &&((( &&(((%%'''$$&&&&&  &&((( &&(((%%'''$$&&&&sW   B3I> :EI> 
E$!I> #E$$I> GAI> 
GI> G:I> >A&K$c                 Z   |j         j        h}	 |ra|                                snLt          ||d          \  }}}|s2|s|r.	 t	          |           n# t
          t          t          f$ r Y nw xY w|a| j        	                    |           d S # | j        	                    |           w xY w)Nr   )ra   rb   rc   )
rZ  rL   rH  rw   r   r   rq   r   r   rt   )r   r   rO   fdsreadablewritableagains          r,   r  zAsynPool._flush_writer#  s    x 	1 
~~'' ,3S#- - -)(E  ( h V)7H=     
  ((00000D ((0000s/   3B A B A-*B ,A--B B*c                 b    t          d | j                                        D                       S )zGet queues for a new process.

        Here we'll find an unused slot, as there should always
        be one available when we start a new process.
        c              3   $   K   | ]\  }}||V  d S r0   r=   )r>   qowners      r,   	<genexpr>z.AsynPool.get_process_queues.<locals>.<genexpr>:  s3       & &(!U} $}}}& &r.   )r   r   r%  r   s    r,   get_process_queueszAsynPool.get_process_queues4  s?      & &dl&8&8&:&: & & & & & 	&r.   c                      t           j        t           j                  z
  d          }|r5 j                             fdt          |          D                        dS dS )z!Grow the pool by ``n`` processes.r   c                 :    i | ]}                                 d S r0   r   r   s     r,   rA   z$AsynPool.on_grow.<locals>.<dictcomp>A  s4     ! ! !78**,,d! ! !r.   N)max
_processesr%   r   updater   )r   r+   ru  s   `  r,   on_growzAsynPool.on_grow=  s    4?S%6%66:: 	L ! ! ! !<A$KK! ! !     	 	r.   c                     dS )z#Shrink the pool by ``n`` processes.Nr=   )r   r+   s     r,   	on_shrinkzAsynPool.on_shrinkE  s      r.   c                    t          d          }t          d          }d}t          |j                  sJ t          |j                  rJ t          |j                  rJ t          |j                  sJ | j        r<t          d          }t          |j                  sJ t          |j                  rJ |||fS )z5Create new in, out, etc. queues, returned as a tuple.T)	wnonblock)	rnonblockN)r   r   r   rL   r   )r   rZ  r   rY  s       r,   r   zAsynPool.create_process_queuesH  s    
 T***d+++#+&&&&&ck*****dl+++++$,'''''; 	0$///Ddl+++++!$,/////D$r.   c                    	 t          fd| j        D                       }n+# t          $ r t                              d          cY S w xY w|j        | j        vsJ |j        | j        vsJ | j        	                    |           || j        |j        <   || j
        |j        <   | j                            |j                   dS )zsCalled when receiving the :const:`WORKER_UP` message.

        Marks the process as ready to receive work.
        c              3   2   K   | ]}|j         k    |V  d S r0   )r   )r>   rn   r   s     r,   r  z,AsynPool.on_process_alive.<locals>.<genexpr>`  s)      >>a#>>r.   z"process with pid=%s already exitedN)r   r  r   r   r   rC  r   r   r   rt   r   r  r_   )r   r   r   s    ` r,   r   zAsynPool.on_process_aliveZ  s    
	M>>>>4:>>>>>DD 	M 	M 	M>>"FLLLLL	M|4#66666|4#55555&&t,,,,0DL).2T]+t|,,,,,s    $ %AAc                     |j         r6|j                                         s|                     ||j                    dS |j        r0|j                                        s|                     |           dS dS dS )z:Called for each job when the process assigned to it exits.N)rO  rH  on_partial_readrP  ro  )r   rN   pid_gones      r,   on_job_process_downzAsynPool.on_job_process_downj  s    = 	 !8!8!:!: 	   cm44444 	 (:(D(D(F(F 	  NN3	  	  	  	 r.   c                 2    |                      ||           dS )zCalled when the process executing job' exits.

        This happens when the process job'
        was assigned to exited by mysterious means (error exitcodes and
        signals).
        N)mark_as_worker_lost)r   rN   r   r  s       r,   on_job_process_lostzAsynPool.on_job_process_lostt  s      	  h/////r.   c           	          | j         dS t          | j                                                   }t          |          d  rt	          | j                   z  nd          d                    fd|D                       d                    t          t          |                    t          	                    | j
        | j
                  t	          | j                  t	          | j                  ddS )NzN/Ac                 2    | rt          |           |z  nddS )Nr   z.2f)float)r@   totals     r,   perz'AsynPool.human_write_stats.<locals>.per  s#    ,-4uQxx%''1:::r.   r   z, c              3   0   K   | ]} |          V  d S r0   r=   )r>   r@   r  r  s     r,   r  z-AsynPool.human_write_stats.<locals>.<genexpr>  s-      99qSSE]]999999r.   )r  active)r  avgallrawstrategyinqueues)r   rl   rN  sumr%   joinmapstrSCHED_STRATEGY_TO_NAMEr   r   r   r   )r   valsr  r  s     @@r,   human_write_statszAsynPool.human_write_stats}  s	   #5D$++--..D			; 	; 	; 3Dus4#344441eLL9999999D9999999Sd^^,,.22#T%8  T/00d122 
 
 	
r.   c                     |j         s8	 d| j        |                     |          <   dS # t          t          f$ r Y dS w xY wdS )z-Called to clean up queues after process exit.N)r   r   _find_worker_queuesr   r   r   r   s     r,   _process_cleanup_queuesz AsynPool._process_cleanup_queues  sb    y 	?CT55d;;<<<j)   	 	s   ( ==c                     | j         D ]u}	 t          |j        j        d           	 |j                            d           9# t
          $ r }|j        t          j        k    r Y d}~^d}~ww xY w# t
          $ r Y rw xY wdS )z>Called at shutdown to tell processes that we're shutting down.r6   N)r   r   rZ  rL   r   rq   rr   r|  )task_handlerr   ru   s      r,   _stop_task_handlerzAsynPool._stop_task_handler  s     !% 
	 
	D	DH,a000HLL&&&&   yEK// 0////    
	 
	s(   A.A
A+A&&A+.
A;:A;c                 ^    t                                          | j        | j                  S )N)r   r   )r   create_result_handlerr   r   )r   r   s    r,   r  zAsynPool.create_result_handler  s0    ww,,/!2 - 
 
 	
r.   c                     || j         v sJ t          | j                   }|| j         |<   |t          | j                   k    sJ dS )z;Mark new ownership for ``queues`` to update fileno indices.N)r   r%   )r   r   queuesbs       r,   _process_register_queuesz!AsynPool._process_register_queues  sQ    %%%%#VC%%%%%%%%r.   c                     	 t          fd| j                                        D                       S # t          $ r t	                    w xY w)z"Find the queues owned by ``proc``.c              3   .   K   | ]\  }}|k    |V  d S r0   r=   )r>   r  r  r   s      r,   r  z/AsynPool._find_worker_queues.<locals>.<genexpr>  s8       * *ha D== (===* *r.   )r   r   r%  r   r   r  s    `r,   r  zAsynPool._find_worker_queues  sv    	# * * * *$,*<*<*>*> * * * * * * 	# 	# 	#T"""	#s	   15 Ac                 L    d | _         d x| _        x| _        x| _        | _        d S r0   )r  _inqueue	_outqueue
_quick_get_poll_resultr  s    r,   _setup_queueszAsynPool._setup_queues  s7     
 37	7 	7 	7Od///r.   c                 B   |j         j        }| j        j        }|h}|r|j        s| j        t          k    rt          |d|d          \  }}}|r	 |                                }|t          d|           dS  ||           ny# t          t          f$ rc}t          |dd          }	|	t          j        k    rY d}~|	t          j        k    rY d}~dS |	t           vrt          d||d           Y d}~dS d}~ww xY wdS |r|j        s| j        t          k    dS dS dS dS dS dS )	a  Flush all queues.

        Including the outbound buffer, so that
        all tasks that haven't been started will be discarded.

        In Celery this is called whenever the transport connection is lost
        (consumer restart), and when a process is terminated.
        Nr  rc   z&got sentinel while flushing process %rrr   z got %r while flushing process %rr6   r   )r   r   r  r   closedr   r   rw   r   r   rq   r   r  rr   rs   EAGAINr   )
r   r   resqr   r  r  r   r   ru   rv   s
             r,   r`  zAsynPool.process_flush_queues  s    y .>f 	$+ 	$+*B*B$S$TBBBNHa .99;;D |FMMM'----  * 	 	 	$S'488F,, 5<//w..@!4!5 5 5 5EEEEE	" -  	$+ 	$+*B*B*B*B 	 	 	 	*B*B 	 	 	 	s$   B C7!C2:C2C22C7c                    |j         s|                     |           t          |          }|r| j                            |           ~|j        sd|_        t          | j                  }	 |                     |          }| 	                    ||          rd| j        | 
                                <   n# t          $ r Y nw xY wt          | j                  |k    sJ dS dS )z8Called when a job was partially written to exited child.TN)rg  ro  rP   r   rt   r   r%   r   r  destroy_queuesr   r   )r   rN   r   rO   beforer   s         r,   r  zAsynPool.on_partial_read  s    } 	 NN3 %% 	 ((000y 
	/DI&&F11$77&&vt44 FAEDL!;!;!=!=>   t|$$....
	/ 
	/ /.s   ,AB4 4
C Cc                    |                                 rJ | j                            |           d}	 | j                            |           n# t
          $ r d}Y nw xY w	 |                     |d         j                                        |           n# t          $ r Y nw xY w|D ]W}|rS|j
        |j        fD ]D}|j        s;|                     |           	 |                                 4# t          $ r Y @w xY wEX|S )zqDestroy queues that can no longer be used.

        This way they can be replaced by new usable sockets.
        r6   r   )rH  r   rt   r   r   r   rz  rL   r^   rq   r   r  rw  r  )r   r   r   removedqueuesocks         r,   r  zAsynPool.destroy_queues  sU   
 >>#####&&t,,,	LV$$$$ 	 	 	GGG		!!&)"3":":"<"<dCCCC 	 	 	D	 	! 	!E !"]EM: ! !D; !---! JJLLLL& ! ! ! D!	! s5   A AA"3B 
B#"B#C..
C;:C;c                 `     |||f|          }t          |          } |d|          }|||fS )Nr  r  )r%   )	r   type_r~   r  r	   r  r  r)   r  s	            r,   r  zAsynPool._create_payload#  sE     ueT]X6664yydD!!tT!!r.   c                     d S r0   r=   )clsr  r  s      r,   _set_result_sentinelzAsynPool._set_result_sentinel+  s	     	r.   c                     | j         fS r0   )r  r  s    r,   _help_stuff_finish_argsz AsynPool._help_stuff_finish_args0  s     
}r.   c                    t          d           i }t                      }|D ]K}	 |j        j                                        }|                    |           |||<   <# t          $ r Y Hw xY w|rXt          |d          \  }}}|r|sd S |D ]&}||         j        j                                         't          d           |Vd S d S )Nz7removing tasks from inqueue until task handler finishedr   r  r   )
r   r[   rZ  r   r^   r_   rq   rw   r   r   )	r  r   fileno_to_procinqRrn   r'   r  r   r  s	            r,   _help_stuff_finishzAsynPool._help_stuff_finish5  s    	E	
 	
 	
 uu 	 	AU]))++%&r""    	!(s!;!;!;Ha   6 6r"&.335555!HHH  	 	 	 	 	s   8A
A+*A+c                     | j         diS )Ng      @)r  r  s    r,   r$  zAsynPool.timersN  s    "C((r.   )NFNN)4r   r   r   r   r   r   r'  r   r   r  r  r  r  r   r!  r.  r0  rD  r"  r	   r   r  r   r#  r  r  r  r  r  r   r   r  r  r  r  staticmethodr  r  r  r  r
  r`  r  r  r  classmethodr  r  r!  propertyr$  r   r   s   @r,   r4   r4     s       $$!MF #(    
 /49=<
 <
 <
 <
 <
 <
|1 1 1 1 1
  
1 1 1  4 4 463 3 3:( ( ("
$ 
$ 
$0 0 0h/ h/ h/V %)(8Y* Y* Y* Y*vF' F' F'P1 1 1"& & &  2 2 2  $- - -      0 0 0
 
 
.     \
 
 
 
 
& & &# # #7 7 7" " "H/ / /4  8 &m$!1" " " "   [  
   [0 ) ) X) ) ) ) )r.   r4   )NNNr   )_r   rr   r  rG   r  rk   r  collectionsr   r   r   ior   numbersr   r   r   structr	   r
   r   r   weakrefr   r   billiardr   r  billiard.compatr   r   billiard.poolr   r   r   r   r   billiard.queuesr   kombu.asynchronousr   r   kombu.serializationr   kombu.utils.eventior   kombu.utils.functionalr   viner   celery.signalsr   celery.utils.functionalr   celery.utils.logr    celery.workerr!   r  	_billiardr"   r-   r   ImportError__all__r   r   rI  r   	frozensetr  rs   r   r   r   SCHED_STRATEGY_FCFSr  r   r%  r  rB   rJ   rP   r   rQ   rT   rX   rZ   ri   rw   r   r   r   Poolr4   r=   r.   r,   <module>r>     sh     				  				   2 2 2 2 2 2 2 2 2 2             # # # # # # , , , , , , , , , ,       , , , , , , , , " " " " " " 3 3 3 3 3 3 3 3 B B B B B B B B B B B B B B ( ( ( ( ( ( ) ) ) ) ) ) ) ) 1 1 1 1 1 1 - - - - - - * * * * * *       7 7 7 7 7 7 ( ( ( ( ( ( ' ' ' ' ' ' / / / / / /
-******JJ - - -%'W     J'- - - - - - -- 	H		|V\u
)U\5;/
0
0 	     "   ED+;+A+A+C+CDDD j/00; ; ;
   766   $D!V]"NFN    6    $D!- - - -`*0 *0 *0Z+ + + + +U\ + + +\" \" \" \" \"E' \" \" \"~}) }) }) }) })uz }) }) }) }) })s   *B3 3CC