
    ^f4                       d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZmZmZmZ dd	lmZmZ d
dlmZmZ d
dlmZ d
dlmZ d
dlmZ dZdZ  ee!          Z"da#d Z$d Z%d)dZ& G d de          Z'd Z(d*dZ)d Z*d Z+d Z,d+dZ-	 	 d+dZ.d,dZ/	 d-d Z0d! Z1d" Z2e	d#             Z3d.d$Z4d.d%Z5d/d&Z6 G d' d(          Z7dS )0zCommon Utilities.    )annotationsN)deque)contextmanager)partial)count)NAMESPACE_OIDuuid3uuid4uuid5)ChannelErrorRecoverableConnectionError   )ExchangeQueue)
get_logger)registry)uuid)		Broadcastmaybe_declarer   itermessages
send_replycollect_repliesinsureddrain_consumer	eventloopi  c                 D    t           t                      j        a t           S N)_node_idr
   int     >/var/www/html/env/lib/python3.11/site-packages/kombu/common.pyget_node_idr#   "   s    77;Or!   c                    d                     | ||t          |                    }	 t          t          t          |                    }n2# t
          $ r% t          t          t          |                    }Y nw xY w|S )Nz{:x}-{:x}-{:x}-{:x})formatidstrr	   r   
ValueErrorr   )node_id
process_id	thread_idinstanceentrets         r"   generate_oidr/   )   s    

&
&Y86 6C-%s++,, - - -%s++,,-Js   "A
 
,A98A9Tc                    t          t                      t          j                    |rt	          j                    nd|           S Nr   )r/   r#   osgetpid	threading	get_ident)r,   threadss     r"   oid_fromr7   3   s=    
	!(/	a	  r!   c                  D     e Zd ZdZej        dz   Z	 	 	 	 	 	 d fd	Z xZS )r   a  Broadcast queue.

    Convenience class used to define broadcast queues.

    Every queue instance will have a unique name,
    and both the queue and exchange is configured with auto deletion.

    Arguments:
    ---------
        name (str): This is used as the name of the exchange.
        queue (str): By default a unique id is used for the queue
            name for every consumer.  You can specify a custom
            queue name here.
        unique (bool): Always create a unique queue
            even if a queue name is supplied.
        **kwargs (Any): See :class:`~kombu.Queue` for a list
            of additional keyword arguments supported.
    ))queueNNFTc                    |r%d                     |pdt                                }n|pdt                       } t                      j        d|p||||||nt	          |d          d| d S )Nz{}.{}bcastzbcast.fanout)type)aliasr9   nameauto_deleteexchanger    )r%   r   super__init__r   )	selfr?   r9   uniquer@   rA   r>   kwargs	__class__s	           r"   rC   zBroadcast.__init__R   s      	/NN5#3GTVV<<EE..dff..E 	
-4#"*"6hh#Dx888	
 	
 	
 	
 	
 	
 	
r!   )NNFTNN)__name__
__module____qualname____doc__r   attrsrC   __classcell__)rG   s   @r"   r   r   <   sj         & K,,E !
 
 
 
 
 
 
 
 
 
r!   r   c                (    | |j         j        j        v S r   )
connectionclientdeclared_entities)entitychannels     r"   declaration_cachedrT   i   s    W'.@@@r!   Fc                B    |rt          | |fi |S t          | |          S )zDeclare entity (cached).)_imaybe_declare_maybe_declare)rR   rS   retryretry_policys       r"   r   r   m   s3     @vw??,???&'***r!   c                t    | j         }|s.|st          d| d|            |                     |          } | S dS )zMake sure the channel is bound to the entity.

    :param entity: generic kombu nomenclature, generally an exchange or queue
    :param channel: channel to bind to the entity
    :return: the updated entity
    zCannot bind channel z to entity N)is_boundr   bind)rR   rS   r[   s      r"   _ensure_channel_is_boundr]   t   sd     H  	ECwCC6CCE E EW%% r!   c                   | }t          | |           |!| j        st          d|  d          | j        }d x}}|j        r-| j        r&|j        j        j        }t          |           }||v rdS |j        st          d          | 
                    |           ||r|                    |           || j        |_        dS )Nzchannel is None and entity z not bound.Fchannel disconnected)rS   T)r]   r[   r   rS   rO   can_cache_declarationrP   rQ   hashr   declareaddr?   )rR   rS   origdeclaredidents        r"   rW   rW      s    DVW--- 	CAfAAAC C C.Hu f: %,>VH5 A()?@@@
NN7N###UK	4r!   c                    t          | |           | j        j        st          d            | j        j        j        j        | t          fi || |          S )Nr_   )r]   rS   rO   r   rP   ensurerW   )rR   rS   rY   s      r"   rV   rV      sw    VW--->$ A()?@@@026>$+20 0".0 006A A Ar!   c              #    K   t                      fd}|g|pg z   | _        | 5  t          | j        j        j        ||d          D ])}	                                 V  # t          $ r Y &w xY w	 ddd           dS # 1 swxY w Y   dS )z&Drain messages from consumer instance.c                6                         | |f           d S r   )append)bodymessageaccs     r"   
on_messagez"drain_consumer.<locals>.on_message   s    

D'?#####r!   T)limittimeoutignore_timeoutsN)r   	callbacksr   rS   rO   rP   popleft
IndexError)consumerrp   rq   rs   ro   _rn   s         @r"   r   r      s     
''C$ $ $ $ $ %b9H	  8+6=!&O O O 	 	Akkmm####   		                 s5   %BA#"B#
A0-B/A00BBBc                F    t           | j        d|g|d||||          S )zIterator over messages.)queuesrS   )rp   rq   rs   r    )r   Consumer)connrS   r9   rp   rq   rs   rF   s          r"   r   r      s@     @eWg@@@@W	   r!   c              #     K   |rt          |          pt                      D ]5}	 |                     |          V  # t          j        $ r |r|s Y 2w xY wdS )a   Best practice generator wrapper around ``Connection.drain_events``.

    Able to drain events forever, with a limit, and optionally ignoring
    timeout errors (a timeout of 1 is often used in environments where
    the socket can get "stuck", and is a best practice for Kombu consumers).

    ``eventloop`` is a generator.

    Examples
    --------
        >>> from kombu.common import eventloop

        >>> def run(conn):
        ...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
        ...     next(it)   # one event consumed, or timed out.
        ...
        ...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
        ...         pass  # loop forever.

    It also takes an optional limit parameter, and timeout errors
    are propagated by default::

        for _ in eventloop(connection, limit=1, timeout=1):
            pass

    See Also
    --------
        :func:`itermessages`, which is an event loop bound to one or more
        consumers, that yields any messages received.
    )rq   N)ranger   drain_eventssocketrq   )r{   rp   rq   rr   is        r"   r   r      s      > #uU||.uww  	##G#444444~ 	 	 	  	 s   >AAc                     |j         |f| ||dt          |j        d         |j                            d          t          j        |j                 |j        dfi |S )a  Send reply for request.

    Arguments:
    ---------
        exchange (kombu.Exchange, str): Reply exchange
        req (~kombu.Message): Original request, a message with
            a ``reply_to`` property.
        producer (kombu.Producer): Producer instance
        retry (bool): If true must retry according to
            the ``reply_policy`` argument.
        retry_policy (Dict): Retry settings.
        **props (Any): Extra properties.
    )rA   rX   rY   reply_tocorrelation_id)routing_keyr   
serializercontent_encoding)publishdict
propertiesgetserializerstype_to_namecontent_typer   )rA   reqmsgproducerrX   rY   propss          r"   r   r      s     8,  s~j9"%."4"45E"F"F)6s7GH$'$8: : D D >CD D  r!   c              /  &  K   |                     dd          }d}	 t          | ||g|R i |D ]!\  }}|s|                                 d}|V  "	 |r|                    |j                   dS dS # |r|                    |j                   w w xY w)z,Generator collecting replies from ``queue``.no_ackTFN)
setdefaultr   ackafter_reply_message_receivedr?   )	r{   rS   r9   argsrF   r   receivedrl   rm   s	            r"   r   r     s      x..FH	=)$ ;+/; ; ;39; ; 	 	MD' HJJJJ	  	=00<<<<<	= 	=8 	=00<<<<	=s   4A1 1Bc                B    t                               d| |d           d S )Nz#Connection error: %r. Retry in %ss
T)exc_info)loggererror)excintervals     r"   _ensure_errbackr     s1    
LL.X      r!   c              #  F   K   	 d V  d S # | j         | j        z   $ r Y d S w xY wr   )connection_errorschannel_errors)r{   s    r"   _ignore_errorsr     sE      !D$77   s   
   c                    |r/t          |           5   ||i |cddd           S # 1 swxY w Y   t          |           S )a  Ignore connection and channel errors.

    The first argument must be a connection object, or any other object
    with ``connection_error`` and ``channel_error`` attributes.

    Can be used as a function:

    .. code-block:: python

        def example(connection):
            ignore_errors(connection, consumer.channel.close)

    or as a context manager:

    .. code-block:: python

        def example(connection):
            with ignore_errors(connection):
                consumer.channel.close()


    Note:
    ----
        Connection and channel errors should be properly handled,
        and not ignored.  Using this function is only acceptable in a cleanup
        phase, like when a connection is lost or at shutdown.
    N)r   )r{   funr   rF   s       r"   ignore_errorsr   %  s    8  (D!! 	( 	(3'''	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	($s   '++c                $    |r ||           d S d S r   r    )rO   rS   	on_revives      r"   revive_connectionr   G  s*     	' r!   c           	     8   |pt           }|                     d          5 }|                    |           |j        }t	          t
          ||          }	 |j        ||f||	d|}
 |
|i t          ||          \  }}|cddd           S # 1 swxY w Y   dS )zFunction wrapper to handle connection errors.

    Ensures function performing broker commands completes
    despite intermittent connection failures.
    T)block)errback)r   )r   r   )rO   N)r   acquireensure_connectiondefault_channelr   r   	autoretryr   )poolr   r   rF   r   r   optsr{   rS   reviver   retvalrw   s                r"   r   r   L  s    (G	D	!	! 	Tw/// &*DIFFF $.g ;w+1; ;59; ;GTCT&T%B%B%BCC		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   A"BBBc                  8    e Zd ZdZdZd Zd	dZd	dZd Zd Z	dS )
QoSa  Thread safe increment/decrement of a channels prefetch_count.

    Arguments:
    ---------
        callback (Callable): Function used to set new prefetch count,
            e.g. ``consumer.qos`` or ``channel.basic_qos``.  Will be called
            with a single ``prefetch_count`` keyword argument.
        initial_value (int): Initial prefetch count value..

    Example:
    -------
        >>> from kombu import Consumer, Connection
        >>> connection = Connection('amqp://')
        >>> consumer = Consumer(connection)
        >>> qos = QoS(consumer.qos, initial_prefetch_count=2)
        >>> qos.update()  # set initial

        >>> qos.value
        2

        >>> def in_some_thread():
        ...     qos.increment_eventually()

        >>> def in_some_other_thread():
        ...     qos.decrement_eventually()

        >>> while 1:
        ...    if qos.prev != qos.value:
        ...        qos.update()  # prefetch changed so update.

    It can be used with any function supporting a ``prefetch_count`` keyword
    argument::

        >>> channel = connection.channel()
        >>> QoS(channel.basic_qos, 10)


        >>> def set_qos(prefetch_count):
        ...     print('prefetch count now: %r' % (prefetch_count,))
        >>> QoS(set_qos, 10)
    Nc                V    || _         t          j                    | _        |pd| _        d S r1   )callbackr4   RLock_mutexvalue)rD   r   initial_values      r"   rC   zQoS.__init__  s(     o''"'a


r!   r   c                    | j         5  | j        r| j        t          |d          z   | _        ddd           n# 1 swxY w Y   | j        S )zIncrement the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   maxrD   ns     r"   increment_eventuallyzQoS.increment_eventually  s     [ 	4 	4z 4!Z#a))3
	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 zs   %9= =c                    | j         5  | j        r"| xj        |z  c_        | j        dk     rd| _        ddd           n# 1 swxY w Y   | j        S )zDecrement the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   r   s     r"   decrement_eventuallyzQoS.decrement_eventually  s     [ 	# 	#z #

a

:>>!"DJ		# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	#
 zs   *>AAc                    || j         k    rg|}|t          k    r"t                              dt                     d}t                              d|           |                     |           || _         |S )z#Set channel prefetch_count setting.z(QoS: Disabled: prefetch_count exceeds %rr   zbasic.qos: prefetch_count->%s)prefetch_count)prevPREFETCH_COUNT_MAXr   warningdebugr   )rD   pcount	new_values      r"   setzQoS.set  sv    TYI***I13 3 3	LL8)DDDMMM333DIr!   c                x    | j         5  |                     | j                  cddd           S # 1 swxY w Y   dS )z)Update prefetch count with current value.N)r   r   r   )rD   s    r"   updatez
QoS.update  s    [ 	( 	(88DJ''	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	(s   /33)r   )
rH   rI   rJ   rK   r   rC   r   r   r   r   r    r!   r"   r   r   `  s{        ( (T D( ( (
        ( ( ( ( (r!   r   )T)NF)r   NN)NNF)NFNr   )NN)8rK   
__future__r   r2   r   r4   collectionsr   
contextlibr   	functoolsr   	itertoolsr   r   r   r	   r
   r   amqpr   r   rR   r   r   logr   serializationr   r   
utils.uuid__all__r   rH   r   r   r#   r/   r7   r   rT   r   r]   rW   rV   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   <module>r      s     " " " " " " 				            % % % % % %             3 3 3 3 3 3 3 3 3 3 3 3 9 9 9 9 9 9 9 9 # # # # # # # #       2 2 2 2 2 2        	H		       *
 *
 *
 *
 *
 *
 *
 *
ZA A A+ + + +     :A A A   $ 9=   $ $ $ $P 9=   2= = =             D   
   (^( ^( ^( ^( ^( ^( ^( ^( ^( ^(r!   