
    ^f                    h   d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	m
Z
 ddlmZ ddlmZmZ 	 ddlmZ d	Zn# e$ r dZd
ZY nw xY wddlmZ ddlmZ ddlmZ ddlmZmZ ddlmZ ddlm Z m!Z!m"Z"m#Z# ddl$m%Z% ddl&m'Z'm(Z(m)Z)m*Z*m+Z+ er$ddl,m-Z- ej.        dk     rddl/m0Z0 nddlm0Z0 ddl1m2Z2 dZ3 ee4          Z5e
Z6dddZ7e6e#dZ8ej9        :                    dd
          Z;ej9        :                    dd
          Z< G d d          Z=e=Z> G d  d!e          Z? G d" d#e          Z@d-d(ZAd.d,ZBdS )/zClient (Connection).    )annotationsN)contextmanager)countcycle)
itemgetter)TYPE_CHECKINGAny)	CERT_NONETF)
exceptions   )
get_logger)Resource)get_transport_clssupports_librabbitmq)	HashedSeq)
dictfilterlazyretry_over_timeshufflecycle)cached_property)as_urlmaybe_sanitize_url	parse_urlquoteurlparse)Channel)   
   )	TypeGuard)TracebackType)
ConnectionConnectionPoolChannelPoolamqp)pyamqplibrabbitmq)round-robinshuffleKOMBU_LOG_CONNECTIONKOMBU_LOG_CHANNELc            	      J   e Zd ZdZdZdZdZdZdZdZ	dZ
dZdZdZdZdZdZdZeZeZdxZxZxZxZZ	 	 	 	 	 	 dZd	Zd
 Zd Zd Zd Zd Zd Zd Z d[dZ!d Z"d Z#d Z$d Z%d Z&d\dZ'd Z(e(Z)d Z*	 	 	 	 d]dZ+e,e-j.        e-j.        fd            Z/e,d             Z0d  Z1d! Z2	 	 	 d^d#Z3d\d$Z4d% Z5d& Z6d' Z7d( Z8d_d)Z9d* Z:d+ Z;dd, e<d-d.d/d0d1          fd`d4Z=d\d5Z>d\d6Z?d\d7Z@dad8ZA	 	 	 dbd9ZB	 	 	 dbd:ZCd; ZDd< ZEd= ZFd> ZGd? ZHd@ ZIdcdHZJeKdI             ZLdJ ZMeKdK             ZNeKdL             ZOdM ZPeKdddO            ZQeKdP             ZReKdQ             ZSeTdR             ZUdS ZVeTdT             ZWeTdU             ZXeTdV             ZYeTdW             ZZeKdX             Z[eKdY             Z\dS )er!   af
  A connection to the broker.

    Example:
    -------
        >>> Connection('amqp://guest:guest@localhost:5672//')
        >>> Connection('amqp://foo;amqp://bar',
        ...            failover_strategy='round-robin')
        >>> Connection('redis://', transport_options={
        ...     'visibility_timeout': 3000,
        ... })

        >>> import ssl
        >>> Connection('amqp://', login_method='EXTERNAL', ssl={
        ...    'ca_certs': '/etc/pki/tls/certs/something.crt',
        ...    'keyfile': '/etc/something/system.key',
        ...    'certfile': '/etc/something/system.cert',
        ...    'cert_reqs': ssl.CERT_REQUIRED,
        ... })

    Note:
    ----
        SSL currently only works with the py-amqp, qpid and redis
        transports.  For other transports you can use stunnel.

    Arguments:
    ---------
        URL (str, Sequence): Broker URL, or a list of URLs.

    Keyword Arguments:
    -----------------
        ssl (bool/dict): Use SSL to connect to the server.
            Default is ``False``.
            May not be supported by the specified transport.
        transport (Transport): Default transport if not specified in the URL.
        connect_timeout (float): Timeout in seconds for connecting to the
            server. May not be supported by the specified transport.
        transport_options (Dict): A dict of additional connection arguments to
            pass to alternate kombu channel implementations.  Consult the
            transport documentation for available options.
        heartbeat (float): Heartbeat interval in int/float seconds.
            Note that if heartbeats are enabled then the
            :meth:`heartbeat_check` method must be called regularly,
            around once per second.

    Note:
    ----
        The connection is established lazily when needed. If you need the
        connection to be established, then force it by calling
        :meth:`connect`::

            >>> conn = Connection('amqp://')
            >>> conn.connect()

        and always remember to close the connection::

            >>> conn.release()

    These options have been replaced by the URL argument, but are still
    supported for backwards compatibility:

    :keyword hostname: Host name/address.
        NOTE: You cannot specify both the URL argument and use the hostname
        keyword argument at the same time.
    :keyword userid: Default user name if not provided in the URL.
    :keyword password: Default password if not provided in the URL.
    :keyword virtual_host: Default virtual host if not provided in the URL.
    :keyword port: Default port if not provided in the URL.
    N/   Fr'   	localhostr   c                .   |g n|}|||||||||	||dx}| _         |rHt          |t                    s3|                    |           |d         }|                    |           |rd|v r6|                    d          |z   }|d         }|                    |           d|v rLd|d |                    d                   v r-|                    dd          \  |d<   |d	<   |d         | _        nld|v rh|pt          |          j	        }t          |          j        s9t          |          }|                    t          |          |d	                    ||d<    | j        di | || _        |pd
| _        | j                            | j                  p| j        | _        | j        r3|                     | j                  | _        t+          | j                   |
i }
|
| _        t.          rd| _        |r|| _        t3                      | _        d S )N)hostnameuseridpasswordvirtual_hostportinsistssl	transportconnect_timeoutlogin_method	heartbeatr   )r0   ;://+r   r7   r0   r'   T )_initial_params
isinstancestrextendupdatesplitindex
uri_prefixr   schemer   can_parse_urlr   r   _init_paramsalt_failover_strategyfailover_strategiesgetfailover_strategyr   nexttransport_options_log_connection_loggersetdeclared_entities)selfr0   r1   r2   r3   r4   r5   r6   r7   r8   rP   r9   rF   r:   rN   
alternateskwargsrJ   params
url_paramss                       B/var/www/html/env/lib/python3.11/site-packages/kombu/connection.py__init__zConnection.__init__   s    &bbJ !F ,F3"(y)
 )
 	
%  	-Jx55 	-JJx   1vHMM8M,,, 	0hnnS))C/q6x000  SH5KhnnU6K6K5K,L%L%L NN3** 8{#VJ%7"("5(""%B(););)B	(33A !*8!4!4JMM":..!+J!7 "   
 '0{###F###  #4"D}!%!9!=!=#"% "% "@(,(? 	8 	//99DJ$ "!2 	 DL 	)(DO!$    c           	         |                                   | j                                         d| _        d|v rt	          |          nd|i} | j        di t          | j        fi | dS )zSwitch connection parameters to use a new URL or hostname.

        Note:
        ----
            Does not reconnect!

        Arguments:
        ---------
            conn_str (str): either a hostname or URL.
        Fr<   r0   Nr>   )closerT   clear_closedr   rI   dictr?   )rU   conn_strconn_paramss      rZ   switchzConnection.switch   s     	

$$&&&#(H#4#4Ih:x:P 	 	FFD!5EEEEFFFFFr\   c                f    | j         r)|                     t          | j                              dS dS )z:Switch to next URL given by the current failover strategy.N)r   rd   rO   rU   s    rZ   maybe_switch_nextzConnection.maybe_switch_next   s8    : 	*KKTZ(()))))	* 	*r\   c                r   |pd}|dk    rt                      rd}|dk    r,t          r%|s#t                              d           dt          i}|| _        || _        || _        |
| _        |p| j	        | _	        |p| j
        | _
        || _        |	| _        || _        || _        |ot          |          | _        d S )Nr$   r&   redisszaSecure redis scheme specified (rediss) with no ssl options, defaulting to insecure SSL behaviour.ssl_cert_reqs)r   ssl_availableloggerwarningr
   r0   r1   r2   r9   r3   r4   r5   r8   r6   transport_clsfloatr:   )rU   r0   r1   r2   r3   r4   r5   r6   r7   r8   r9   r:   s               rZ   rI   zConnection._init_params  s     '	#7#9#9%I  ] 3 NNA   #I.C  ((=D,=%DI	.&"7uY'7'7r\   c                F    | j                             | j        |           d S N)r7   register_with_event_loop
connection)rU   loops     rZ   rr   z#Connection.register_with_event_loop  s"    //FFFFFr\   c                    | j         rHd}t          j        |                    t	          |           t          |                    g|R i | d S d S )Nz [Kombu connection:{id:#x}] {msg})idmsg)rR   rl   debugformatrv   rA   )rU   rw   argsrW   fmts        rZ   _debugzConnection._debug   sq    < 	*4CLr$xxSXX>> ** * *"(* * * * *	* 	*r\   c                0    |                      dd          S )z+Establish connection to server immediately.r   Fmax_retriesreraise_as_library_errors_ensure_connectionrf   s    rZ   connectzConnection.connect&  s$    &&U ' 
 
 	
r\   c                    |                      d           | j                            | j                  }t          rddlm}  ||dd          S |S )z Create and return a new channel.zcreate channelr   )
Logwrappedzkombu.channelz[Kombu channel:{0.channel_id}] )r|   r7   create_channelrs   _log_channelutils.debugr   )rU   chanr   s      rZ   channelzConnection.channel,  sj    $%%%~,,T_== 	A//////:dO?A A Ar\      c                D    | j                             | j        |          S )a  Check heartbeats.

        Allow the transport to perform any periodic tasks
        required to make heartbeats work.  This should be called
        approximately every second.

        If the current transport does not support heartbeats then
        this is a noop operation.

        Arguments:
        ---------
            rate (int): Rate is how often the tick is called
                compared to the actual heartbeat value.  E.g. if
                the heartbeat is set to 3 seconds, and the tick
                is called every 3 / 2 seconds, then the rate is 2.
                This value is currently unused by any transports.
        )rate)r7   heartbeat_checkrs   )rU   r   s     rZ   r   zConnection.heartbeat_check6  s!    $ ~--doD-IIIr\   c                2     | j         j        | j        fi |S )zWait for a single event from the server.

        Arguments:
        ---------
            timeout (float): Timeout in seconds before we give up.

        Raises
        ------
            socket.timeout: if the timeout is exceeded.
        )r7   drain_eventsrs   rU   rW   s     rZ   r   zConnection.drain_eventsJ  s#     +t~*4?EEfEEEr\   c                b    	 |                                  dS # | j        | j        z   $ r Y dS w xY w)z>Close given channel, but ignore connection and channel errors.N)r^   connection_errorschannel_errorsrU   r   s     rZ   maybe_close_channelzConnection.maybe_close_channelW  sE    	MMOOOOO&)<< 	 	 	DD	s    ..c                $   | j                                          | j        r|                     | j                   | j        rN	 | j                            | j                   n$# | j        t          t          j
        fz   $ r Y nw xY wd | _        d S d S rq   )rT   r_   _default_channelr   _connectionr7   close_connectionr   AttributeErrorsocketerrorrf   s    rZ   _do_close_selfzConnection._do_close_self^  s    $$&&&  	<$$T%:;;; 	$//0@AAAA)^V\,JJ   #D	$ 	$s   A# #BBc                    |                                   |                                  |                     d           d| _        dS )z;Really close connection, even if part of a connection pool.closedTN)r   _do_close_transportr|   r`   rf   s    rZ   _closezConnection._closej  sC      """Hr\   c                >    | j         rd | j         _        d | _         d S d S rq   )
_transportclientrf   s    rZ   r   zConnection._do_close_transportq  s+    ? 	#%)DO""DOOO	# 	#r\   c                   	 | j         j        } || j                   n# t          $ r t	          j                    }t	          j        |           	 |                                  n# t          j        $ r Y nw xY wt	          j        |           n# t	          j        |           w xY wY nw xY w| 	                                 | j
                                         d | _        d S rq   )r   _collectr   r   r   getdefaulttimeoutsetdefaulttimeoutr   timeoutr   rT   r_   )rU   socket_timeoutgc_transport_timeos       rZ   collectzConnection.collectv  s   	+?3L L)****  	1 	1 	1-//F$^4441##%%%%>    (0000(000000	1 	  """$$&&&sE    1B,A&%B&A85B7A88B;B,B&&B,+B,c                .    |                                   dS )zClose the connection (if open).Nr   rf   s    rZ   releasezConnection.release  s    r\   c                      | j         |i | | S )ztPublic interface of _ensure_connection for retro-compatibility.

        Returns kombu.Connection instance.
        r   rU   rz   rW   s      rZ   ensure_connectionzConnection.ensure_connection  s!    
 	 0000r\      Tc	                      j         r j        S d fd	}	 j        }
|s j        }
 |
            5  t	           j         j        di |	||||||          cddd           S # 1 swxY w Y   dS )a  Ensure we have a connection to the server.

        If not retry establishing the connection with the settings
        specified.

        Arguments:
        ---------
            errback (Callable): Optional callback called each time the
                connection can't be established.  Arguments provided are
                the exception raised and the interval that will be
                slept ``(exc, interval)``.

            max_retries (int): Maximum number of times to retry.
                If this limit is exceeded the connection error
                will be re-raised.

            interval_start (float): The number of seconds we start
                sleeping for.
            interval_step (float): How many seconds added to the interval
                for each retry.
            interval_max (float): Maximum number of seconds to sleep between
                each retry.
            callback (Callable): Optional callback that is called for every
                internal iteration (1 s).
            timeout (int): Maximum amount of time in seconds to spend
                attempting to connect, total over all retries.
        r   c                                         |          }|rt          |          }r | |                                            |r|ndS Nr   )completes_cyclerO   rg   )exc	intervalsretriesintervalrounderrbackrU   s        rZ   on_errorz/Connection._ensure_connection.<locals>.on_error  sd    ((11E +	?? 'X&&&""$$$$+88!+r\   r>   )r   N)r   )	connectedr   _reraise_as_library_errors_dummy_contextr   _connection_factoryrecoverable_connection_errors)rU   r   r   interval_startinterval_stepinterval_maxcallbackr   r   r   ctxs   ``         rZ   r   zConnection._ensure_connection  s    B > 	$##	, 	, 	, 	, 	, 	, 	, -( 	&%CSUU 	 	"($*LB+|'	  	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   $A##A'*A'c              #     K   	 d V  d S # ||f$ r  | j         $ r} |t          |                    |d }~w| j        $ r} |t          |                    |d }~ww xY wrq   )r   rA   recoverable_channel_errors)rU   ConnectionErrorChannelErrorr   s       rZ   r   z%Connection._reraise_as_library_errors  s      
	2EEEEE. 	 	 	1 	5 	5 	5!/#c((++4. 	2 	2 	2,s3xx((c1	2s   
 A4AAAc              #     K   d V  d S rq   r>   rf   s    rZ   r   zConnection._dummy_context  s      r\   c                J    | j         r|dz   t          | j                   z   ndS )z?Return true if the cycle is complete after number of `retries`.r   T)rJ   len)rU   r   s     rZ   r   zConnection.completes_cycle  s(    48HFGaK3tx==000$Fr\   c                p    | j         r,|| j         ur%|                     | j                    d| _         dS dS dS )z2Revive connection after connection re-established.N)r   r   )rU   new_channels     rZ   revivezConnection.revive  sO      	)[8M%M%M$$T%:;;;$(D!!!	) 	)%M%Mr\   r   c
           
         	 	t                      		 f
d}
j         d|
_        j        |
_        j        |
_        |
S )a.  Ensure operation completes.

        Regardless of any channel/connection errors occurring.

        Retries by establishing the connection, and reapplying
        the function.

        Arguments:
        ---------
            obj: The object to ensure an action on.
            fun (Callable): Method to apply.

            errback (Callable): Optional callback called each time the
                connection can't be established.  Arguments provided are
                the exception raised and the interval that will
                be slept ``(exc, interval)``.

            max_retries (int): Maximum number of times to retry.
                If this limit is exceeded the connection error
                will be re-raised.

            interval_start (float): The number of seconds we start
                sleeping for.
            interval_step (float): How many seconds added to the interval
                for each retry.
            interval_max (float): Maximum number of seconds to sleep between
                each retry.
            on_revive (Callable): Optional callback called whenever
                revival completes successfully
            retry_errors (tuple): Optional list of errors to retry on
                regardless of the connection state.

        Examples
        --------
            >>> from kombu import Connection, Producer
            >>> conn = Connection('amqp://')
            >>> producer = Producer(conn)

            >>> def errback(exc, interval):
            ...     logger.error('Error: %r', exc, exc_info=1)
            ...     logger.info('Retry in %s seconds.', interval)

            >>> publish = conn.ensure(producer, producer.publish,
            ...                       errback=errback, max_retries=3)
            >>> publish({'hello': 'world'}, routing_key='dest')
        Nc                 J  
 d}j         }j        }t          j        d          }                                5  t          d          D ]=}	  | i |c cd d d            S # $ r+}|k    r                     d|d           Y d }~Ed }~w|$ r}|r|s |k    r                     d|d                                            
o 
|d           d }t          |z
  d          }	                    
|d           j
        }	                    |	           r |	           |dz  }Y d }~d }~w|$ r:}|k    r                     d	|d           
o 
|d           Y d }~7d }~ww xY w	 d d d            d S # 1 swxY w Y   d S )
Nr   r   zensure retry policy error: %rr   )exc_infozensure connection error: %rF)r   zensure channel error: %r)r   r   hasattrr7   r   r   r|   r   maxr   default_channelr   )rz   rW   got_connectionconn_errorschan_errorshas_modern_errorsr   r   remaining_retriesr   r   funr   r   r   r   obj	on_reviveretry_errorsrU   s             rZ   _ensuredz#Connection.ensure.<locals>._ensured"  s   N<K9K ' ?! ! 0022 )4 )4$Qxx (4 (4G'4"sD3F3333)4 )4 )4 )4 )4 )4 )4 )4 ( 5 5 5&2w+7M7M!$C$'! $ 5 5 5 5 5 5 5 5& , , ,) "2C "
 "&2w+7M7M!$A$'! $ 5 5 53GGCOO,0)&203K'4I10M0M-//#-*M<6;	 0    #'"6

7+++$ /%Ig...&!+& 4 4 4&2w7L7L!$>$'! $ 5 5 53GGCOO4G(4)4 )4 )4 )4 )4 )4 )4 )4 )4 )4 )4 )4 )4 )4 )4 )4 )4 )4s`   FA%F%F*!BFFB(E FF/F<FFFFFz	(ensured))tuple__name____doc__
__module__)rU   r   r   r   r   r   r   r   r   r   r   s   `````````` rZ   ensurezConnection.ensure  s    b  77L0	4 0	4 0	4 0	4 0	4 0	4 0	4 0	4 0	4 0	4 0	4 0	4 0	4 0	4b  #|666;!nr\   c                `    |g G fdd          } ||           } | j         ||fi |S )a  Decorator for functions supporting a ``channel`` keyword argument.

        The resulting callable will retry calling the function if
        it raises connection or channel related errors.
        The return value will be a tuple of ``(retval, last_created_channel)``.

        If a ``channel`` is not provided, then one will be automatically
        acquired (remember to close it afterwards).

        See Also
        --------
            :meth:`ensure` for the full list of supported keyword arguments.

        Example:
        -------
            >>> channel = connection.channel()
            >>> try:
            ...    ret, channel = connection.autoretry(
            ...         publish_messages, channel)
            ... finally:
            ...    channel.close()
        c                  z    e Zd Z edd          Z  edd          Z edd          Zd Z fdZ fdZdS )%Connection.autoretry.<locals>.Revivalr   Nr   r   c                    || _         d S rq   )rs   )rU   rs   s     rZ   r[   z.Connection.autoretry.<locals>.Revival.__init__v  s    ",r\   c                    |d<   d S r   r>   )rU   r   channelss     rZ   r   z,Connection.autoretry.<locals>.Revival.revivey  s    %r\   c                    d         |                      | j        j                    |dd         i|d         fS )Nr   r   )r   rs   r   )rU   rz   rW   r   r   s      rZ   __call__z.Connection.autoretry.<locals>.Revival.__call__|  sK    A;&KK ?@@@sD@(1+@@@(1+MMr\   )r   r   __qualname__getattrr   r[   r   r   )r   r   s   rZ   Revivalr   q  s        wsJ55H lD99Jgc9d33G- - -& & & & &N N N N N N N Nr\   r   )r   )rU   r   r   ensure_optionsr   r   r   s    `    @rZ   	autoretryzConnection.autoretryX  sz    . 9	N 	N 	N 	N 	N 	N 	N 	N 	N 	N 	N  t{66<<^<<<r\   c                >     |                                  |           S )N)r   )r   rf   s    rZ   create_transportzConnection.create_transport  s!    't%%''t4444r\   c                `    | j         }|rt          |t                    rt          |          }|S )z'Get the currently used transport class.)rn   r@   rA   r   )rU   rn   s     rZ   r   zConnection.get_transport_cls  s6    * 	=
=# > > 	=-m<<Mr\   c           	     Z     | j         di t          |                     d          fi |S )z3Create a copy of the connection with same settings.F)resolver>   )	__class__ra   _infor   s     rZ   clonezConnection.clone  s8    t~JJTZZZ%>%> I I& I IJJJr\   c                @    | j                             | j                  S rq   )r7   get_heartbeat_intervalrs   rf   s    rZ   r   z!Connection.get_heartbeat_interval  s    ~44T_EEEr\   c                   | j         }|r| j                            ||          }| j        j        }| j        s\|                    d          rGt                              d|                    d           d           |                    d          }n| j        }| j        r| j         d| }d|fd| j	        p|                    d          fd| j
        p|                    d          fd| j        p|                    d          fd| j        p|                    d          fd	| j        fd
| j        fd|fd| j        fd| j        fd| j        p|                    d          fd| j        fd| j        fd| j        fd| j        ff}|S )Nr0   z0No hostname was supplied. Reverting to default ''r=   r1   r2   r3   r4   r5   r6   r7   r8   rP   r9   rF   r:   rN   rV   )rn   resolve_aliasesrM   r7   default_connection_paramsr0   rl   rm   rF   r1   r2   r3   r4   r5   r6   r8   rP   r9   r:   rK   rJ   )rU   r   rn   Dr0   infos         rZ   r   zConnection._info  s   * 	. 044}. .MN4} 	%z!2!2 	%NN>)*z):):> > >? ? ? uuZ((HH}H? 	7/66H66H "t{5aeeHoo6;!%%
*;*;<T.G!%%2G2GHTY/!%%--0t{#DH-( 45 $"89T.G!%%2G2GH4?+$.) $"9:48$
" r\   c                D    t          |                                           S )zGet connection info.)ra   r   rf   s    rZ   r  zConnection.info  s    DJJLL!!!r\   c                    t          | j        | j        | j        | j        | j        | j        t          | j                            S rq   )	r   rn   r0   r1   r2   r3   r4   reprrP   rf   s    rZ   
__eqhash__zConnection.__eqhash__  s=    +T]DK(949d4557 7 	7r\   z**r4   r1   r2   r3   r7   returnrA   c           
        | j         pd}| j        j        rZ| j         }	 | j                            |||          S # t          $ r Y nw xY w| j        r| j         d| }|st          |          }|S | j        r| j         d| }|st          |          }|S |                                 } ||          \  }}}	}
}t          |||||	t          |
          | |          S )z*Convert connection parameters to URL form.r.   r=   )sanitizemask)
r0   r7   rH   as_uriNotImplementedErrorrF   r   r  r   r   )rU   include_passwordr  	getfieldsr0   connection_as_urifieldsr4   r1   r2   vhostr7   s               rZ   r  zConnection.as_uri  sD    =/K>' 	% $~,,%'7? ? ?&     D'+$C$C$C$C!# J$67H$I$I!$$? 	%#'? ? ?X ? ?# J$67H$I$I!$$3<9V3D3D0fhyxvxu))
 
 
 	
s   : 
AAc                    t          | |fi |S )a/  Pool of connections.

        See Also
        --------
            :class:`ConnectionPool`.

        Arguments:
        ---------
            limit (int): Maximum number of active connections.
                Default is no limit.

        Example:
        -------
            >>> connection = Connection('amqp://')
            >>> pool = connection.Pool(2)
            >>> c1 = pool.acquire()
            >>> c2 = pool.acquire()
            >>> c3 = pool.acquire()
            Traceback (most recent call last):
              File "<stdin>", line 1, in <module>
              File "kombu/connection.py", line 354, in acquire
              raise ConnectionLimitExceeded(self.limit)
                kombu.exceptions.ConnectionLimitExceeded: 2
            >>> c1.release()
            >>> c3 = pool.acquire()
        )r"   rU   limitrW   s      rZ   PoolzConnection.Pool  s    6 dE44V444r\   c                    t          | |fi |S )a'  Pool of channels.

        See Also
        --------
            :class:`ChannelPool`.

        Arguments:
        ---------
            limit (int): Maximum number of active channels.
                Default is no limit.

        Example:
        -------
            >>> connection = Connection('amqp://')
            >>> pool = connection.ChannelPool(2)
            >>> c1 = pool.acquire()
            >>> c2 = pool.acquire()
            >>> c3 = pool.acquire()
            Traceback (most recent call last):
              File "<stdin>", line 1, in <module>
              File "kombu/connection.py", line 354, in acquire
              raise ChannelLimitExceeded(self.limit)
                kombu.connection.ChannelLimitExceeded: 2
            >>> c1.release()
            >>> c3 = pool.acquire()
        )r#   r  s      rZ   r#   zConnection.ChannelPool  s    6 411&111r\   c                *    ddl m}  ||p| g|R i |S )z,Create new :class:`kombu.Producer` instance.r   )Producer)	messagingr  )rU   r   rz   rW   r  s        rZ   r  zConnection.Producer  s8    ''''''x49$999&999r\   c                ,    ddl m}  ||p| |g|R i |S )z,Create new :class:`kombu.Consumer` instance.r   )Consumer)r  r  )rU   queuesr   rz   rW   r  s         rZ   r  zConnection.Consumer!  s:    ''''''x4A$AAA&AAAr\   c                .    ddl m}  ||p| |||||fi |S )a  Simple persistent queue API.

        Create new :class:`~kombu.simple.SimpleQueue`, using a channel
        from this connection.

        If ``name`` is a string, a queue and exchange will be automatically
        created using that name as the name of the queue and exchange,
        also it will be used as the default routing key.

        Arguments:
        ---------
            name (str, kombu.Queue): Name of the queue/or a queue.
            no_ack (bool): Disable acknowledgments. Default is false.
            queue_opts (Dict): Additional keyword arguments passed to the
                constructor of the automatically created :class:`~kombu.Queue`.
            queue_args (Dict): Additional keyword arguments passed to the
                constructor of the automatically created :class:`~kombu.Queue`
                for setting implementation extensions (e.g., in RabbitMQ).
            exchange_opts (Dict): Additional keyword arguments passed to the
                constructor of the automatically created
                :class:`~kombu.Exchange`.
            channel (ChannelT): Custom channel to use. If not specified the
                connection default channel is used.
        r   )SimpleQueue)simpler"  )	rU   nameno_ack
queue_opts
queue_argsexchange_optsr   rW   r"  s	            rZ   r"  zConnection.SimpleQueue&  sI    6 	('''''{7?dD&*%(4 4,24 4 	4r\   c                .    ddl m}  ||p| |||||fi |S )a  Simple ephemeral queue API.

        Create new :class:`~kombu.simple.SimpleQueue` using a channel
        from this connection.

        See Also
        --------
            Same as :meth:`SimpleQueue`, but configured with buffering
            semantics. The resulting queue and exchange will not be durable,
            also auto delete is enabled. Messages will be transient (not
            persistent), and acknowledgments are disabled (``no_ack``).
        r   )SimpleBuffer)r#  r*  )	rU   r$  r%  r&  r'  r(  r   rW   r*  s	            rZ   r*  zConnection.SimpleBufferF  sI     	)(((((|GOtT6:&)5 5-35 5 	5r\   c                    |                      d           | j                                        }|                      d|            |S )Nzestablishing connection...zconnection established: %r)r|   r7   establish_connection)rU   conns     rZ   _establish_connectionz Connection._establish_connectionZ  sB    0111~22440$777r\   c                (    || j         j        j        v S rq   )r7   
implementsexchange_type)rU   r1  s     rZ   supports_exchange_typez!Connection.supports_exchange_type`  s     9 GGGr\   c                T    d|                                   dt          |           ddS )Nz<Connection: z at z#x>)r  rv   rf   s    rZ   __repr__zConnection.__repr__c  s,    @t{{}}@@"T((@@@@@r\   c                *    |                                  S rq   )r   rf   s    rZ   __copy__zConnection.__copy__f  s    zz||r\   c                x    | j         t          |                                                                           d fS rq   )r   r   r  valuesrf   s    rZ   
__reduce__zConnection.__reduce__i  s-    ~uTYY[[%7%7%9%9::D@@r\   c                    | S rq   r>   rf   s    rZ   	__enter__zConnection.__enter__l  s    r\   exc_typetype[BaseException] | Noneexc_valBaseException | Noneexc_tbTracebackType | NoneNonec                .    |                                   d S rq   )r   )rU   r=  r?  rA  s       rZ   __exit__zConnection.__exit__o  s     	r\   c                @    | j                             | j                  S rq   )r7   qos_semantics_matches_specrs   rf   s    rZ   rG  z%Connection.qos_semantics_matches_specw  s    ~88IIIr\   c                    d| j         i}| j        }|rKd|v r|d         |d<   d|v r|d         |d<   d|v r|d         |d<   d|v r|d         |d<   d|v r|d         |d<   |S )Nr   r   r   r   r   connect_retries_timeout)r8   rP   )rU   	conn_optstransport_optss      rZ   _extract_failover_optsz!Connection._extract_failover_opts{  s     45	/ 	>..+9-+H	-(>11.<=M.N	*+.00-;O-L	/*//,:>,J	.)(N::"#<= )$r\   c                b    | j          o'| j        duo| j                            | j                  S )z3Return true if the connection has been established.N)r`   r   r7   verify_connectionrf   s    rZ   r   zConnection.connected  s?     L  C ,C001ABB	Dr\   c                ^    | j         s%| j        s|                     dd          S | j        S dS )zThe underlying connection object.

        Warning:
        -------
            This instance is transport specific, so do not
            depend on the interface of this object.
        r   Fr~   N)r`   r   r   r   rf   s    rZ   rs   zConnection.connection  sO     | 	$> .. !U /    ##	$ 	$r\   c                    | j                                          d | _        |                                 | _        d| _        | j        S )NF)rT   r_   r   r.  r   r`   rf   s    rZ   r   zConnection._connection_factory  sC    $$&&& $5577r\   r   c                    |                                  } | j        di | | j        |                                 | _        | j        S )aw  Default channel.

        Created upon access and closed when the connection is closed.

        Note:
        ----
            Can be used for automatic channel handling when you only need one
            channel, and also it is the channel implicitly used if
            a connection is passed instead of a channel, to functions that
            require a channel.
        Nr>   )rL  r   r   r   )rU   rJ  s     rZ   r   zConnection.default_channel  sP     //11	,,),,, ($(LLNND!$$r\   c                ^    d                     | j        t          | j                  g          S )z5The host as a host name/port pair separated by colon.:)joinr0   rA   r4   rf   s    rZ   hostzConnection.host  s%     xxDI7888r\   c                P    | j         |                                 | _         | j         S rq   )r   r   rf   s    rZ   r7   zConnection.transport  s%    ?""3355DOr\   c                    | j         j        S )zAMQP Management API.

        Experimental manager that can be used to manage/monitor the broker
        instance.

        Not available for all transports.
        )r7   managerrf   s    rZ   rX  zConnection.manager  s     ~%%r\   c                &     | j         j        |i |S rq   )r7   get_managerr   s      rZ   rZ  zConnection.get_manager  s    )t~)4:6:::r\   c                t    	 |                                  j        S # t          $ r | j        | j        z   cY S w xY w)zRecoverable connection errors.

        List of connection related exceptions that can be recovered from,
        but where the connection must be closed and re-established first.
        )r   r   r   r   r   rf   s    rZ   r   z(Connection.recoverable_connection_errors  sU    	@))++II 	@ 	@ 	@
 )D,?????	@s    77c                X    	 |                                  j        S # t          $ r Y dS w xY w)zRecoverable channel errors.

        List of channel related exceptions that can be automatically
        recovered from without re-establishing the connection.
        r>   )r   r   r   rf   s    rZ   r   z%Connection.recoverable_channel_errors  s>    	))++FF 	 	 	22	s    
))c                4    |                                  j        S )z8List of exceptions that may be raised by the connection.)r   r   rf   s    rZ   r   zConnection.connection_errors  s     %%''99r\   c                4    |                                  j        S )z5List of exceptions that may be raised by the channel.)r   r   rf   s    rZ   r   zConnection.channel_errors  s     %%''66r\   c                $    | j         j        j        S rq   )r7   r0  
heartbeatsrf   s    rZ   supports_heartbeatszConnection.supports_heartbeats  s    ~(33r\   c                $    | j         j        j        S rq   )r7   r0  asynchronousrf   s    rZ   
is_eventedzConnection.is_evented  s    ~(55r\   )r.   NNNNFFNr-   NNNr   r'   N)r   rq   )NNr   r   r   NTN)NNr   r   r   NN)T)r  rA   )NN)NNNNN)r=  r>  r?  r@  rA  rB  r  rC  )r  r   )]r   r   r   r   r4   r3   r8   r`   r   r   r   rR   rF   rT   r   rP   rN   r:   r  rL   r0   r1   r2   r6   r9   r[   rd   rg   rI   rr   r|   r   r   r   r   r   r   r   r   r   r   r^   r   r   r   r   OperationalErrorr   r   r   r   r   r   r   r   r   r   r   r  r
  r   r  r  r#   r  r  r"  r*  r.  r2  r5  r7  r:  r<  rE  propertyrG  rL  r   rs   r   r   rU  r7   r   rX  rZ  r   r   r   r   ra  rd  r>   r\   rZ   r!   r!   ?   s       C CJ DLOGKJGJ  E 
 & I%O-8<<H<v<<348EJ<=GK0= B' B' B' B'HG G G&* * *
8 8 80G G G* * *
 
 
  J J J J(F F F  
$ 
$ 
$  # # #
       ,   E   )-8:15	7 7 7 7r  '7#42 2 2 ^2   ^G G G) ) ) :>?@,0h h h hT*= *= *= *=X5 5 5  K K KF F F# # # #J" " "7 7 7
 ',$#FHj$2KA A
 
 
 
 
>5 5 5 5:2 2 2 2:: : : :
B B B B
 9=#044 4 4 4@ :> $155 5 5 5(  H H HA A A  A A A      J J XJ  " D D XD $ $ X$      % % % X%( 9 9 X9   X
 & & _&; ; ; @ @ _@ 	 	 _	 : : _: 7 7 _7 4 4 X4 6 6 X6 6 6r\   r!   c                  t     e Zd ZdZej        ZdZd fd	Zd Z	d Z
d Zdd	Zedd            Zd Zd Z xZS )r"   zPool of connections.TNc                Z    || _         t                                          |           d S N)r  rs   superr[   rU   rs   r  rW   r   s       rZ   r[   zConnectionPool.__init__  +    $u%%%%%r\   c                4    | j                                         S rq   )rs   r   rf   s    rZ   newzConnectionPool.new  s    $$&&&r\   c                T    	 |                     d           d S # t          $ r Y d S w xY w)Nreleased)r|   r   rU   resources     rZ   release_resourcezConnectionPool.release_resource  sA    	OOJ''''' 	 	 	DD	s    
''c                .    |                                  d S rq   r   rr  s     rZ   close_resourcezConnectionPool.close_resource  s    r\   皙?c                Z    t          |t                    s|                    |          S d S rq   )r@   r   r   )rU   rs  r   s      rZ   collect_resourcezConnectionPool.collect_resource   s2    (D)) 	4##N333	4 	4r\   Fc              #  ~   K   |                      |          5 }||j        fV  d d d            d S # 1 swxY w Y   d S )N)block)acquirer   )rU   r{  rs   s      rZ   acquire_channelzConnectionPool.acquire_channel$  s      \\\&& 	9*j88888	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9s   266c                @   | j         r| j        j        }t          |          | j         t          | j                  z
  k     r]| j                            t          | j                             t          |          | j         t          | j                  z
  k     Yd S d S d S rq   )r  	_resourcequeuer   _dirty
put_nowaitr   ro  )rU   qs     rZ   setupzConnectionPool.setup)  s    : 	:$Aa&&4:DK(8(8888))$tx..999 a&&4:DK(8(888888	: 	: 98r\   c                b    t          |          r
 |            }|                    d           |S )Nacquired)callabler|   rr  s     rZ   preparezConnectionPool.prepare0  s4    H 	"xzzH
###r\   rq   )rw  )F)r   r   r   r   r   ConnectionLimitExceededLimitExceededclose_after_forkr[   ro  rt  rv  ry  r   r}  r  r  __classcell__r   s   @rZ   r"   r"   
  s        6M& & & & & &' ' '    4 4 4 4 9 9 9 ^9: : :      r\   r"   c                  D     e Zd ZdZej        Zd fd	Zd Zd Z	d Z
 xZS )r#   zPool of channels.Nc                Z    || _         t                                          |           d S ri  rj  rl  s       rZ   r[   zChannelPool.__init__<  rm  r\   c                4    t          | j        j                  S rq   )r   rs   r   rf   s    rZ   ro  zChannelPool.new@  s    DO+,,,r\   c                ^   |                                  }| j        r| j        j        }t	          |          | j        t	          | j                  z
  k     rX| j                            t          |                     t	          |          | j        t	          | j                  z
  k     Td S d S d S rq   )ro  r  r  r  r   r  r  r   )rU   r   r  s      rZ   r  zChannelPool.setupC  s    ((**: 	9$Aa&&4:DK(8(8888))$w--888 a&&4:DK(8(888888	9 	9 98r\   c                8    t          |          r
 |            }|S rq   )r  r   s     rZ   r  zChannelPool.prepareK  s"    G 	 giiGr\   rq   )r   r   r   r   r   ChannelLimitExceededr  r[   ro  r  r  r  r  s   @rZ   r#   r#   7  sx        3M& & & & & &- - -9 9 9      r\   r#   r   Channel | Connectionr  r   c                2    t          |           r| j        S | S )zGet channel from object.

    Return the default channel if argument is a connection instance,
    otherwise just return the channel given.
    )is_connectionr   )r   s    rZ   maybe_channelr  Q  s"     W '&&Nr\   r   r	   TypeGuard[Connection]c                ,    t          | t                    S rq   )r@   r!   )r   s    rZ   r  r  \  s    c:&&&r\   )r   r  r  r   )r   r	   r  r  )Cr   
__future__r   osr   sys
contextlibr   	itertoolsr   r   operatorr   typingr   r	   r6   r
   rk   ImportErrorkombur   logr   rs  r   r7   r   r   utils.collectionsr   utils.functionalr   r   r   r   utils.objectsr   	utils.urlr   r   r   r   r   kombu.transport.virtualr   version_infotyping_extensionsr   typesr    __all__r   rl   roundrobin_failoverr  rL   environrM   rQ   r   r!   BrokerConnectionr"   r#   r  r  r>   r\   rZ   <module>r     s'     " " " " " " 				  



 % % % % % % " " " " " " " "       % % % % % % % %MM   IMMM                   > > > > > > > > ( ( ( ( ( ( M M M M M M M M M M M M * * * * * * M M M M M M M M M M M M M M $//////
'!!///////$$$$$$######
9	H		    '  
 *..!7??z~~1599E6 E6 E6 E6 E6 E6 E6 E6P  * * * * *X * * *Z    (   4   ' ' ' ' ' 's   ; 	AA