
    ^fm/                       d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZ 	 ddlZdd	lmZmZmZmZ dd
lmZmZ dZdZn# e$ r	 dZdxZZY nw xY wddlmZ  ee          ZdZ G d de          Z G d dej                  Z G d dej                   Z  G d dej!                  Z! G d dej"                  Z"dS )a	  confluent-kafka transport module for Kombu.

Kafka transport using confluent-kafka library.

**References**

- http://docs.confluent.io/current/clients/confluent-kafka-python

**Limitations**

The confluent-kafka transport does not support PyPy environment.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string has the following format:

.. code-block::

    confluentkafka://[USER:PASSWORD@]KAFKA_ADDRESS[:PORT]

Transport Options
=================
* ``connection_wait_time_seconds`` - Time in seconds to wait for connection
  to succeed. Default ``5``
* ``wait_time_seconds`` - Time in seconds to wait to receive messages.
  Default ``5``
* ``security_protocol`` - Protocol used to communicate with broker.
  Visit https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for
  an explanation of valid values. Default ``plaintext``
* ``sasl_mechanism`` - SASL mechanism to use for authentication.
  Visit https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for
  an explanation of valid values.
* ``num_partitions`` - Number of partitions to create. Default ``1``
* ``replication_factor`` - Replication factor of partitions. Default ``1``
* ``topic_config`` - Topic configuration. Must be a dict whose key-value pairs
  correspond with attributes in the
  http://kafka.apache.org/documentation.html#topicconfigs.
* ``kafka_common_config`` - Configuration applied to producer, consumer and
  admin client. Must be a dict whose key-value pairs correspond with attributes
  in the https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
* ``kafka_producer_config`` - Producer configuration. Must be a dict whose
  key-value pairs correspond with attributes in the
  https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
* ``kafka_consumer_config`` - Consumer configuration. Must be a dict whose
  key-value pairs correspond with attributes in the
  https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
* ``kafka_admin_config`` - Admin client configuration. Must be a dict whose
  key-value pairs correspond with attributes in the
  https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
    )annotations)Empty)virtual)cached_property)str_to_bytes)dumpsloadsN)ConsumerKafkaExceptionProducerTopicPartition)AdminClientNewTopic )
get_loggeri#  c                      e Zd ZdZdZdS )NoBrokersAvailablez(Kafka broker is not available exception.TN)__name__
__module____qualname____doc__	retriabler       P/var/www/html/env/lib/python3.11/site-packages/kombu/transport/confluentkafka.pyr   r   Z   s        22IIIr   r   c                  $     e Zd ZdZd fd	Z xZS )MessagezMessage object.Nc                t    |                     d          | _         t                      j        |fd|i| d S )Ntopicchannel)getr   super__init__)selfpayloadr   kwargs	__class__s       r   r"   zMessage.__init__c   s@    [[))
<<'<V<<<<<r   N)r   r   r   r   r"   __classcell__r&   s   @r   r   r   `   sC        = = = = = = = = = =r   r   c                  D    e Zd ZdZi Zd Zd Zd Zd Zd Z	ddZ
dd
Zd	S )QoSzQuality of Service guarantees.c                L    | j          pt          | j                  | j         k     S )zReturn true if the channel can be consumed from.

        :returns: True, if this QoS object can accept a message.
        :rtype: bool
        prefetch_countlen_not_yet_ackedr#   s    r   can_consumezQoS.can_consumem   s.     && #d.A*B*BT+ 	r   c                L    | j         r| j         t          | j                  z
  S dS )N   r-   r1   s    r   can_consume_max_estimatezQoS.can_consume_max_estimatev   s+     	&T-@)A)AAA1r   c                    || j         |<   d S r'   r0   )r#   messagedelivery_tags      r   appendz
QoS.append|   s    ,3L)))r   c                    | j         |         S r'   r7   )r#   r9   s     r   r    zQoS.get   s    "<00r   c                    || j         vrd S | j                             |          }| j                            |j                  }|                                 d S r'   )r0   popr   _get_consumerr   commit)r#   r9   r8   consumers       r   ackzQoS.ack   sX    t222F%)),77<--gm<<r   Fc                f   |r| j                             |          }| j                            |j                  }|                                D ]I}t          |j        |j                  }|                    |g          \  }|	                    |           JdS | 
                    |           dS )zReject a message by delivery tag.

        If requeue is True, then the last consumed message is reverted so
        it'll be refetched on the next attempt.
        If False, that message is consumed and ignored.
        N)r0   r=   r   r>   r   
assignmentr   	partition	committedseekrA   )r#   r9   requeuer8   r@   rC   topic_partitioncommitted_offsets           r   rejectz
QoS.reject   s      		#)--l;;G|11'-@@H&1133 0 0
"01;1E#G #G%-%7%78I%J%J"!.////	0 0 HH\"""""r   Nc                    d S r'   r   )r#   stderrs     r   restore_unacked_oncezQoS.restore_unacked_once       r   )Fr'   )r   r   r   r   r0   r2   r5   r:   r    rA   rJ   rM   r   r   r   r+   r+   h   s        ((N    4 4 41 1 1  # # # #$     r   r+   c                       e Zd ZdZeZeZdZdZdZ fdZ	d Z
d Zd Zd Zd	 Zd
 Zd Zd Zd Zd Zed             Zed             Zed             Zed             Zed             Zed             Z fdZ xZS )ChannelzKafka Channel.   Nc                     t                      j        |i | i | _        i | _        |                                 | _        d S r'   )r!   r"   _kafka_consumers_kafka_producers_open_client)r#   argsr%   r&   s      r   r"   zChannel.__init__   sB    $)&))) " "zz||r   c                H    t          |                              dd          S )z>Need to sanitize the name, celery sometimes pushes in @ signs.@ )strreplace)r#   queues     r   sanitize_queue_namezChannel.sanitize_queue_name   s    5zz!!#r***r   c                    |                      |          }| j                            |d          }|<t          i | j        | j                            d          pi           }|| j        |<   |S )z9Create/get a producer instance for the given topic/queue.Nkafka_producer_config)r^   rT   r    r   common_configoptions)r#   r]   producers      r   _get_producerzChannel._get_producer   s    ((//(,,UD99 !$!<##$;<<B!  H ,4D!%(r   c                   |                      |          }| j                            |d          }|Yt          | dddd| j        | j                            d          pi           }|                    |g           || j        |<   |S )z9Create/get a consumer instance for the given topic/queue.Nz-consumer-groupearliestF)zgroup.idzauto.offset.resetzenable.auto.commitkafka_consumer_config)r^   rS   r    r
   ra   rb   	subscribe)r#   r]   r@   s      r   r>   zChannel._get_consumer   s    ((//(,,UD99$555%/&+! ! $	!
 <##$;<<B!  H w'''+3D!%(r   c                    |                      |          }|                     |          }|                    |t          t	          |                               |                                 dS )z!Put a message on the topic/queue.N)r^   rd   producer   r   flush)r#   r]   r8   r%   rc   s        r   _putzChannel._put   sb    ((//%%e,,U7^^ < <===r   c                   |                      |          }|                     |          }d}	 |                    | j                  }n# t          $ r Y nw xY w|st                      |                                }|r(t                              |           t                      i t          |	                                          d|
                                iS )z#Get a message from the topic/queue.Nr   )r^   r>   pollwait_time_secondsStopIterationr   errorloggerr	   valuer   )r#   r]   r%   r@   r8   rq   s         r   _getzChannel._get   s    ((//%%e,,	mmD$:;;GG 	 	 	D	  	''M 	LL''MC%((C'7==??CCCs   A	 	
AAc                    |                      |          }| j        |                                          | j                            |           | j                            |g           dS )zDelete a queue/topic.N)r^   rS   closer=   clientdelete_topics)r#   r]   rW   r%   s       r   _deletezChannel._delete   sd    ((//e$**,,,!!%(((!!5'*****r   c                D   |                      |          }| j                            |d          }|dS d}|                                D ]T}t	          ||j                  }|                    |          \  }}|                    |g          \  }|||j        z
  z  }U|S )z6Get the number of pending messages in the topic/queue.Nr   )	r^   rS   r    rC   r   rD   get_watermark_offsetsrE   offset)	r#   r]   r@   sizerC   rH   _
end_offsetrI   s	            r   _sizezChannel._size   s    ((//(,,UD991"--// 	9 	9J,UJ4HIIO&<<_MMOQ
!)!3!3_4E!F!FJ!1!888DDr   c           	     h   |                      |          }|| j                                        j        v rdS t	          || j                            dd          | j                            dd          | j                            di                     }| j                            |g           dS )z(Create a new topic if it does not exist.Nnum_partitionsr4   replication_factortopic_config)r   r   config)
new_topics)r^   rw   list_topicstopicsr   rb   r    create_topics)r#   r]   r%   r   s       r   
_new_queuezChannel._new_queue  s    ((//DK++--444F<++,<a@@#|//0DaHH<##NB77	
 
 
 	!!eW!55555r   c                l    |                      |          }|| j                                        j        v S )z Check if a topic already exists.)r^   rw   r   r   )r#   r]   r%   s      r   
_has_queuezChannel._has_queue  s1    ((////11888r   c                    t          i | j        | j                            d          pi           }	 |                    | j                   n&# t          j        $ r}t          |          d }~ww xY w|S )Nkafka_admin_config)timeout)	r   ra   rb   r    r   ro   confluent_kafkar   r   )r#   rw   es      r   rU   zChannel._open  s     
 
| 455;
  
	(t'=>>>>- 	( 	( 	($Q'''	( s   A A3A..A3c                P    | j         |                                 | _         | j         S r'   )rV   rU   r1   s    r   rw   zChannel.client'  s!    <::<<DL|r   c                $    | j         j        j        S r'   )
connectionrw   transport_optionsr1   s    r   rb   zChannel.options-  s    %77r   c                    | j         j        S r'   )r   rw   r1   s    r   conninfozChannel.conninfo1  s    %%r   c                B    | j                             d| j                  S )Nro   )rb   r    default_wait_time_secondsr1   s    r   ro   zChannel.wait_time_seconds5  s$    |!?
 
 	
r   c                B    | j                             d| j                  S )Nconnection_wait_time_seconds)rb   r    $default_connection_wait_time_secondsr1   s    r   r   z$Channel.connection_wait_time_seconds;  s%    |*5
 
 	
r   c                   | j         j        }d|j         dt          |j                  pt
           i}| j                            dd          }|                                dk    r<|	                    ||j
        |j        | j                            d          d           |	                    | j                            d          pi            |S )Nzbootstrap.servers:security_protocol	plaintextsasl_mechanism)zsecurity.protocolzsasl.usernamezsasl.passwordzsasl.mechanismkafka_common_config)r   rw   hostnameintportDEFAULT_PORTrb   r    lowerupdateuseridpassword)r#   r   r   r   s       r   ra   zChannel.common_configB  s    ?)$KKs8='9'9'I\KK
 !L,,-@+NN""$$33MM%6!)!)!2"&,"2"23C"D"D	     	dl&&'<==CDDDr   c                    t                                                       i | _        | j                                        D ]}|                                 i | _        d S r'   )r!   rv   rT   rS   values)r#   r@   r&   s     r   rv   zChannel.closeU  s[     "-4466 	 	HNN "r   )r   r   r   r   r+   r   r   r   rV   r"   r^   rd   r>   rl   rt   ry   r   r   r   rU   propertyrw   rb   r   r   ro   r   ra   rv   r(   r)   s   @r   rP   rP      s       
CG !+,(G$ $ $ $ $+ + +    "  D D D*+ + +   6 6 69 9 9
     X
 8 8 X8 & & X& 
 
 _

 
 
 _
   _$# # # # # # # # #r   rP   c                  \     e Zd ZdZdddZeZeZdZd	Z	e
fZ fd
Zd Z fdZ fdZ xZS )	TransportzKafka Transport.F**urir[   returnc                    d S r'   r   )r#   r   include_passwordmasks       r   as_urizTransport.as_urib  rN   r   kafkaconfluentkafkac                h    t           t          d           t                      j        |fi | d S )Nz,The confluent-kafka library is not installed)r   ImportErrorr!   r"   )r#   rw   r%   r&   s      r   r"   zTransport.__init__p  s=    "LMMM**6*****r   c                    t           j        S r'   )r   __version__r1   s    r   driver_versionzTransport.driver_versionu  s    **r   c                D    t                                                      S r'   )r!   establish_connection)r#   r&   s    r   r   zTransport.establish_connectionx  s    ww++---r   c                F    t                                          |          S r'   )r!   close_connection)r#   r   r&   s     r   r   zTransport.close_connection{  s    ww''
333r   )Fr   )r   r[   r   r[   )r   r   r   r   r   rP   r   default_portdriver_typedriver_namer   recoverable_connection_errorsr"   r   r   r   r(   r)   s   @r   r   r   _  s             GLK"K 	%!+ + + + +
+ + +. . . . .4 4 4 4 4 4 4 4 4r   r   )#r   
__future__r   r]   r   kombu.transportr   kombu.utilsr   kombu.utils.encodingr   kombu.utils.jsonr   r	   r   r
   r   r   r   confluent_kafka.adminr   r   KAFKA_CONNECTION_ERRORSKAFKA_CHANNEL_ERRORSr   	kombu.logr   r   rr   r   r   r   r+   rP   r   r   r   r   <module>r      sA  : :x # " " " " "       # # # # # # ' ' ' ' ' ' - - - - - - ) ) ) ) ) ) ) )81 1 1 1 1 1 1 1 1 1 1 1;;;;;;;;  8 8 8O5772228 !          	H		       = = = = =go = = =4 4 4 4 4'+ 4 4 4n}# }# }# }# }#go }# }# }#@4 4 4 4 4! 4 4 4 4 4s   A AA