
    ^f^                        d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZ dd	lmZmZmZ dd
lmZ ddlmZmZ ddlmZmZ erddlmZ dZ G d d          Z G d d          ZdS )zSending and receiving messages.    )annotations)count)TYPE_CHECKING   )maybe_declare)compress)is_connectionmaybe_channel)ExchangeQueuemaybe_delivery_mode)ContentDisallowed)dumpsprepare_accept_content)ChannelPromise
maybe_list)TracebackType)r   r   ProducerConsumerc                      e Zd ZdZdZdZdZdZdZdZ	dZ
	 	 	 d dZd Zd Zd Zd	 Zd!dZdeefdZ	 	 	 	 	 d"dZ	 d#dZd Zd Z eee          Zd Zd Zd$dZd ZeZ	 	 d%dZed             Z dS )&r   a  Message Producer.

    Arguments:
    ---------
        channel (kombu.Connection, ChannelT): Connection or channel.
        exchange (kombu.entity.Exchange, str): Optional default exchange.
        routing_key (str): Optional default routing key.
        serializer (str): Default serializer. Default is `"json"`.
        compression (str): Default compression method.
            Default is no compression.
        auto_declare (bool): Automatically declare the default exchange
            at instantiation. Default is :const:`True`.
        on_return (Callable): Callback to call for undeliverable messages,
            when the `mandatory` or `immediate` arguments to
            :meth:`publish` is used. This callback needs the following
            signature: `(exception, exchange, routing_key, message)`.
            Note that the producer needs to drain events to use this feature.
    N Tc                .   || _         || _        |p| j        | _        |p| j        | _        |p| j        | _        |p| j        | _        d | _        | j        t          d          | _        ||| _        | j         r| 	                    | j                    d S d S )Nr   )
_channelexchangerouting_key
serializercompression	on_return_channel_promiser   auto_declarerevive)selfchannelr   r   r   r    r   r   s           A/var/www/html/env/lib/python3.11/site-packages/kombu/messaging.py__init__zProducer.__init__A   s       &:$*:$7&:$*:"4dn $= $RLLDM# ,D= 	'KK&&&&&	' 	'    c                    d| j          dS )Nz<Producer: >r   r"   s    r$   __repr__zProducer.__repr__S   s    -T]----r&   c                8    | j         |                                 fS N)	__class____reduce_args__r*   s    r$   
__reduce__zProducer.__reduce__V   s    ~t335555r&   c                D    d | j         | j        | j        | j        | j        fS r-   )r   r   r   r    r   r*   s    r$   r/   zProducer.__reduce_args__Y   s'    dmT%5t!4#35 	5r&   c                T    | j         j        r| j                                          dS dS )zDeclare the exchange.

        Note:
        ----
            This happens automatically at instantiation when
            the :attr:`auto_declare` flag is enabled.
        N)r   namedeclarer*   s    r$   r4   zProducer.declare]   s5     = 	$M!!#####	$ 	$r&   Fc                2    |rt          || j        |fi |S dS )z=Declare exchange if not already declared during this session.N)r   r#   )r"   entityretryretry_policys       r$   r   zProducer.maybe_declareh   s3     	N uMMMMM	N 	Nr&   c                    t          ||          r|j         ||p|j                  fS | ||p| j        j                  fS r-   )
isinstancer3   delivery_moder   )r"   r   r;   r   r   s        r$   _delivery_detailszProducer._delivery_detailsm   sl     h)) 	="5"57!7# #  
 ,,8T]8
 
 
 	
r&   r   c                   | j         }|g n|}|
i n|
}
|i n|}|| j        n|}|| j        n|}|                     |p| j        |          \  }|d<   |"t          t          |dz                      |d<   |                     ||	||||
          \  }}}| j        r/| j        j	        r#| j        |vr|
                    | j                   |r | j        j        | |fi |} ||||||
|||||||          S )a  Publish message to the specified exchange.

        Arguments:
        ---------
            body (Any): Message body.
            routing_key (str): Message routing key.
            delivery_mode (enum): See :attr:`delivery_mode`.
            mandatory (bool): Currently not supported.
            immediate (bool): Currently not supported.
            priority (int): Message priority. A number between 0 and 9.
            content_type (str): Content type. Default is auto-detect.
            content_encoding (str): Content encoding. Default is auto-detect.
            serializer (str): Serializer to use. Default is auto-detect.
            compression (str): Compression method to use.  Default is none.
            headers (Dict): Mapping of arbitrary headers to pass along
                with the message body.
            exchange (kombu.entity.Exchange, str): Override the exchange.
                Note that this exchange must have been declared.
            declare (Sequence[EntityT]): Optional list of required entities
                that must have been declared before publishing the message.
                The entities will be declared using
                :func:`~kombu.common.maybe_declare`.
            retry (bool): Retry publishing, or declaring entities if the
                connection is lost.
            retry_policy (Dict): Retry configuration, this is the keywords
                supported by :meth:`~kombu.Connection.ensure`.
            expiration (float): A TTL in seconds can be specified per message.
                Default is no expiration.
            timeout (float): Set timeout to wait maximum timeout second
                for message to publish.
            **properties (Any): Additional message properties, see AMQP spec.
        Nr;   i  
expiration)_publishr   r   r<   r   strint_preparer    r3   append
connectionensure)r"   bodyr   r;   	mandatory	immediateprioritycontent_typecontent_encodingr   headersr   r   r7   r8   r4   r>   timeout
propertiesr?   exchange_names                        r$   publishzProducer.publishz   se   L =""W""W)1rr|*5*=d&&;*5*=d&&;595K5K%}6
 6
2z/2 !'*3zD/@+A+A'B'BJ|$/3}}*l,<0" 0",l,  	.!3 	.}G++t}--- 	N-t-dHMMMMHx(L*:Zi7G
 
 	
r&   c                   | j         }|                    ||||||          }|r| j        fd|D              |                    d          }t	          |t
                    r
|j        |d<   |                    ||
|||	|          S )Nc                &    g | ]} |          S  rS   ).0r6   r   s     r$   
<listcomp>z%Producer._publish.<locals>.<listcomp>   s#    999v]]6""999r&   reply_to)r   r   rG   rH   rM   )r#   prepare_messager   getr:   r   r3   basic_publish)r"   rF   rI   rJ   rK   rL   rN   r   rG   rH   r   r4   rM   r#   messagerV   r   s                   @r$   r?   zProducer._publish   s     ,))(Lgz
 
  	: .M99999999 >>*--h&& 	3%-]Jz"$$;9	 % 
 
 	
r&   c                    | j         }t          |t                    rW |            x}| _         | j                            |           | j        r%|j        d                             | j                   |S )Nbasic_return)r   r:   r   r   r!   r   eventsaddr"   r#   s     r$   _get_channelzProducer._get_channel   sq    -g~.. 	C&-gii/GdmM  )))~ C~.224>BBBr&   c                    || _         d S r-   r)   r_   s     r$   _set_channelzProducer._set_channel   s    r&   c                p   t          |          r|| _        t          fd          }t          |t                    r#|| _        |                     |          | _        dS || _        | j        r*| j        j        d                             | j                   |                     |          | _        dS )z*Revive the producer after connection loss.c                      j         S r-   )default_channel)rD   s   r$   <lambda>z!Producer.revive.<locals>.<lambda>   s
    Z-G r&   r\   N)	r	   __connection__r   r:   r   r   r   r]   r^   )r"   r#   rD   s     @r$   r!   zProducer.revive   s    !! 	I J",D$%G%G%G%GHHGg~.. 	3#DM MM'22DMMM $DM~ I$^488HHH MM'22DMMMr&   c                    | S r-   rS   r*   s    r$   	__enter__zProducer.__enter__   s    r&   exc_typetype[BaseException] | Noneexc_valBaseException | Noneexc_tbTracebackType | NonereturnNonec                .    |                                   d S r-   )release)r"   rj   rl   rn   s       r$   __exit__zProducer.__exit__   s     	r&   c                    d S r-   rS   r*   s    r$   rs   zProducer.release   s    r&   c                    |s|p| j         }t          ||          \  }}}n3t          |t                    r|sd}|                    |          }n|sd}|rt          ||          \  }|d<   |||fS )N)r   zutf-8binaryr   )r   r   r:   r@   encoder   )r"   rF   r   rJ   rK   r   rL   s          r$   rB   zProducer._prepare  s      	,#6tJ4J777\+T $$$ ,' /'.${{#344 & ,#+  	G+3D++F+F(D'-(\#333r&   c                V    	 | j         p| j        j        j        S # t          $ r Y d S w xY wr-   )rg   r#   rD   clientAttributeErrorr*   s    r$   rD   zProducer.connection  s?    	&H$,*A*HH 	 	 	DD	s    
(()NNNNNNF)NNFFr   NNNNNNFNNNNr-   rj   rk   rl   rm   rn   ro   rp   rq   )NNNNN)!__name__
__module____qualname____doc__r   r   r   r   r    r   rg   r%   r+   r0   r/   r4   r   r   r   r<   rP   r?   r`   rb   propertyr#   r!   ri   rt   rs   closerB   rD   rS   r&   r$   r   r      s        ( H K J K L I N;?AE' ' ' '$. . .6 6 65 5 5	$ 	$ 	$N N N N
 9=.A#+
 
 
 
 =A;<EIEJJN	D
 D
 D
 D
P 8<
 
 
 
.        h|\22G3 3 3         E;?BF4 4 4 44   X  r&   r   c                  6   e Zd ZdZeZdZdZdZdZdZ	dZ
dZdZdZdZ ed          Z	 	 	 d&dZed             Zej        d             Zd Zd	 Zd
 Zd Zd'dZd Zd(dZd ZeZd Zd Zd Zd Z d)dZ!d*dZ"d  Z#dedfd!Z$d(d"Z%d# Z&d$ Z'ed%             Z(dS )+r   a  Message consumer.

    Arguments:
    ---------
        channel (kombu.Connection, ChannelT): see :attr:`channel`.
        queues (Sequence[kombu.Queue]): see :attr:`queues`.
        no_ack (bool): see :attr:`no_ack`.
        auto_declare (bool): see :attr:`auto_declare`
        callbacks (Sequence[Callable]): see :attr:`callbacks`.
        on_message (Callable): See :attr:`on_message`
        on_decode_error (Callable): see :attr:`on_decode_error`.
        prefetch_count (int): see :attr:`prefetch_count`.
    NTr   c                N   || _         t          |pg           | _        || j        n|| _        |	| j        pg n|| _        || _        |
| _        i | _        ||| _        ||| _	        t          |          | _        |	| _        | j         r|                     | j                    d S d S r-   )r#   r   queuesno_ack	callbacks
on_message
tag_prefix_active_tagsr    on_decode_errorr   acceptprefetch_countr!   )r"   r#   r   r   r    r   r   r   r   r   r   s              r$   r%   zConsumer.__init__  s      2..%+^dkk2;2C$..B( 	$$# ,D&#2D ,V44,< 	&KK%%%%%	& 	&r&   c                N    t          | j                                                  S r-   )list_queuesvaluesr*   s    r$   r   zConsumer.queues  s    DL''))***r&   c                (    d |D             | _         d S )Nc                    i | ]
}|j         |S rS   )r3   )rT   qs     r$   
<dictcomp>z#Consumer.queues.<locals>.<dictcomp>  s    222a222r&   )r   )r"   r   s     r$   r   zConsumer.queues  s    226222r&   c                   | j                                          t          |          x}| _        t	          | j                                                  D ]T\  }}| j                            |d            || j                  x}| j        |j        <   |	                    |           U| j
        r|                                  | j        |                     | j                   dS dS )z&Revive consumer after connection loss.N)r   )r   clearr
   r#   r   r   itemspopr3   r!   r    r4   r   qos)r"   r#   qnamequeues       r$   r!   zConsumer.revive  s    !!!!.w!7!77$, !3!3!5!566 	" 	"LE5LUD)))/4uT\/B/BBEDL,LL!!!! 	LLNNN*HHD$7H88888 +*r&   c                f    | j                                         D ]}|                                 dS )zDeclare queues, exchanges and bindings.

        Note:
        ----
            This is done automatically at instantiation
            when :attr:`auto_declare` is set.
        N)r   r   r4   r"   r   s     r$   r4   zConsumer.declare  s:     \((** 	 	EMMOOOO	 	r&   c                :    | j                             |           dS )a%  Register a new callback to be called when a message is received.

        Note:
        ----
            The signature of the callback needs to accept two arguments:
            `(body, message)`, which is the decoded message body
            and the :class:`~kombu.Message` instance.
        N)r   rC   )r"   callbacks     r$   register_callbackzConsumer.register_callback  s      	h'''''r&   c                .    |                                   | S r-   )consumer*   s    r$   ri   zConsumer.__enter__  s    r&   rj   rk   rl   rm   rn   ro   rp   rq   c                    | j         rZ| j         j        rP| j         j        j        j        }t	          ||          s,	 |                                  d S # t          $ r Y d S w xY wd S d S d S r-   )r#   rD   rz   connection_errorsr:   cancel	Exception)r"   rj   rl   rn   conn_errorss        r$   rt   zConsumer.__exit__  s     < 	DL3 	,18JKg{33 KKMMMMM    DD	 	 	 	 s   A 
AAc                z     || j                   }| j        r|                                 || j        |j        <   |S )zAdd a queue to the list of queues to consume from.

        Note:
        ----
            This will not start consuming from the queue,
            for that you will have to call :meth:`consume` after.
        )r#   r    r4   r   r3   r   s     r$   	add_queuezConsumer.add_queue  s@     dl## 	MMOOO#(UZ r&   c                    t          | j                                                  }|rT|| j        n|}|dd         |d         }}|D ]}|                     ||d           |                     ||d           dS dS )aV  Start consuming messages.

        Can be called multiple times, but note that while it
        will consume from new queues added since the last call,
        it will not cancel consuming from removed queues (
        use :meth:`cancel_by_queue`).

        Arguments:
        ---------
            no_ack (bool): See :attr:`no_ack`.
        NTr   nowaitF)r   r   r   r   _basic_consume)r"   r   r   HTr   s         r$   r   zConsumer.consume  s     dl))++,, 	@$*NT[[F#2#;r
qA G G##E&#FFFF&?????	@ 	@r&   c                    | j         j        }| j                                        D ]} ||           | j                                         dS )zEnd all active queue consumers.

        Note:
        ----
            This does not affect already delivered messages, but it does
            mean the server will not send any more messages for this consumer.
        N)r#   basic_cancelr   r   r   )r"   r   tags      r$   r   zConsumer.cancel  sV     *$++-- 	 	CF3KKKK!!!!!r&   c                D   t          |t                    r|j        n|}	 | j                            |          }| j                            |           n# t          $ r Y nw xY w| j                            |d           dS # | j                            |d           w xY w)zCancel consumer by queue name.N)	r:   r   r3   r   r   r#   r   KeyErrorr   )r"   r   r   r   s       r$   cancel_by_queuezConsumer.cancel_by_queue  s    (66A

E	*#''..C L%%c****  	 	 	D	
 LUD)))))DLUD))))s(   A B 
A"B !A""B Bc                P    |}t          |t                    r|j        }|| j        v S )z8Return :const:`True` if currently consuming from queue'.)r:   r   r3   r   )r"   r   r3   s      r$   consuming_fromzConsumer.consuming_from  s.    eU## 	:Dt(((r&   c                b    t          d | j                                        D                       S )zPurge messages from all queues.

        Warning:
        -------
            This will *delete all ready messages*, there is no undo operation.
        c              3  >   K   | ]}|                                 V  d S r-   )purge)rT   r   s     r$   	<genexpr>z!Consumer.purge.<locals>.<genexpr>!  s*      DDU5;;==DDDDDDr&   )sumr   r   r*   s    r$   r   zConsumer.purge  s/     DDdl.A.A.C.CDDDDDDr&   c                :    | j                             |           dS )a  Enable/disable flow from peer.

        This is a simple flow-control mechanism that a peer can use
        to avoid overflowing its queues or otherwise finding itself
        receiving more messages than it can process.

        The peer that receives a request to stop sending content
        will finish sending the current content (if any), and then wait
        until flow is reactivated.
        N)r#   flow)r"   actives     r$   r   zConsumer.flow#  s      	&!!!!!r&   r   Fc                :    | j                             |||          S )a  Specify quality of service.

        The client can request that messages should be sent in
        advance so that when the client finishes processing a message,
        the following message is already held locally, rather than needing
        to be sent down the channel. Prefetching gives a performance
        improvement.

        The prefetch window is Ignored if the :attr:`no_ack` option is set.

        Arguments:
        ---------
            prefetch_size (int): Specify the prefetch window in octets.
                The server will send a message in advance if it is equal to
                or smaller in size than the available prefetch size (and
                also falls within other prefetch limits). May be set to zero,
                meaning "no specific limit", although other prefetch limits
                may still apply.

            prefetch_count (int): Specify the prefetch window in terms of
                whole messages.

            apply_global (bool): Apply new settings globally on all channels.
        )r#   	basic_qos)r"   prefetch_sizer   apply_globals       r$   r   zConsumer.qos0  s&    2 |%%m&4&24 4 	4r&   c                8    | j                             |          S )a  Redeliver unacknowledged messages.

        Asks the broker to redeliver all unacknowledged messages
        on the specified channel.

        Arguments:
        ---------
            requeue (bool): By default the messages will be redelivered
                to the original recipient. With `requeue` set to true, the
                server will attempt to requeue the message, potentially then
                delivering it to an alternative subscriber.
        )requeue)r#   basic_recover)r"   r   s     r$   recoverzConsumer.recoverM  s     |))'):::r&   c                X    | j         }|st          d          fd|D              dS )a  Method called when a message is received.

        This dispatches to the registered :attr:`callbacks`.

        Arguments:
        ---------
            body (Any): The decoded message body.
            message (~kombu.Message): The message instance.

        Raises
        ------
            NotImplementedError: If no consumer callbacks have been
                registered.
        z$Consumer does not have any callbacksc                (    g | ]} |          S rS   rS   )rT   r   rF   rZ   s     r$   rU   z$Consumer.receive.<locals>.<listcomp>n  s%    ;;;X$	 	 ;;;r&   N)r   NotImplementedError)r"   rF   rZ   r   s    `` r$   receivezConsumer.receive\  sH     N	 	N%&LMMM;;;;;;;;;;;r&   c                    | j                             |j                  }|4|                     ||          }|                    || j        ||           |S )Nr   )r   rX   r3   _add_tagr   _receive_callback)r"   r   consumer_tagr   r   r   s         r$   r   zConsumer._basic_consumep  s^    ##EJ//;--|44CMM#t5!'  8 8 8
r&   c                    |p,d                     | j        t          | j                            }|| j        |j        <   |S )Nz{}{})formatr   next_tagsr   r3   )r"   r   r   r   s       r$   r   zConsumer._add_tagy  sA     /fmmOT$*--/ /(+%*%
r&   c                   | j         }| j        | j        d }}}	 t          |dd           }|r ||          }|||_         |j        r|                    | j                  S |rd n|                                }|r ||          n|                     ||          S # t          $ r)}| j        s |                     ||           Y d }~d S d }~ww xY w)Nmessage_to_python)
r   r   r#   getattrerrors_reraise_errorr   decoder   r   )r"   rZ   r   on_mr#   decodedm2pexcs           r$   r   zConsumer._receive_callback  s   !%$,wg	M'#6==C '#g,,!!'~ D--d.BCCC"8dd(8(8G %)L44===dll7G.L.LL  	/ 	/ 	/'   #.........	/s   AB !B 
C&C

Cc                B    dt          |           j         d| j         dS )N<z: r(   )typer~   r   r*   s    r$   r+   zConsumer.__repr__  s&    84::&88$+8888r&   c                H    	 | j         j        j        S # t          $ r Y d S w xY wr-   )r#   rD   rz   r{   r*   s    r$   rD   zConsumer.connection  s7    	<*11 	 	 	DD	s    
!!)	NNNNNNNNNr}   r-   )r   r   Fr|   ))r~   r   r   r   r   r#   r   r   r    r   r   r   r   r   r   r   r   r%   r   setterr!   r4   r   ri   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r+   rD   rS   r&   r$   r   r   '  s?         * G F F L I$ J O F N GE!HHEGKBF>B& & & &* + + X+ ]3 3 ]39 9 9"	 	 		( 	( 	(       @ @ @ @*" " " E
* 
* 
*) ) )E E E" " "4 4 4 4:; ; ; ;< < <( 26$T      M M M&9 9 9   X  r&   r   N)r   
__future__r   	itertoolsr   typingr   commonr   r   r   rD   r	   r
   r6   r   r   r   
exceptionsr   serializationr   r   utils.functionalr   r   typesr   __all__r   r   rS   r&   r$   <module>r      si   % % " " " " " "                   ! ! ! ! ! ! ! ! ! ! ! ! 4 4 4 4 4 4 4 4 8 8 8 8 8 8 8 8 8 8 ) ) ) ) ) ) 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 8 $######
7N N N N N N N Nbs s s s s s s s s sr&   