
    Yf                        d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ e	dk    rdd	lmZ ndd
lmZ  G d d          Z G d de          Z G d dej                  ZdS )zK
Tests for the internal implementation details of L{twisted.internet.udp}.
    )annotationsN)udp)DatagramProtocol)platformType)unittestwin32)WSAEWOULDBLOCK)EWOULDBLOCKc                  *    e Zd ZdZddZdd	ZddZdS )StringUDPSocketa  
    A fake UDP socket object, which returns a fixed sequence of strings and/or
    socket errors.  Useful for testing.

    @ivar retvals: A C{list} containing either strings or C{socket.error}s.

    @ivar connectedAddr: The address the socket is connected to.
    retvalslist[bytes | socket.error]returnNonec                "    || _         d | _        d S N)r   connectedAddr)selfr   s     Z/var/www/html/env/lib/python3.11/site-packages/twisted/internet/test/test_udp_internals.py__init__zStringUDPSocket.__init__    s    ,0    addrobjectc                    || _         d S r   )r   )r   r   s     r   connectzStringUDPSocket.connect$   s    !r   sizeinttuple[bytes, None]c                v    | j                             d          }t          |t          j                  r||dfS )zH
        Return (or raise) the next value from C{self.retvals}.
        r   N)r   pop
isinstancesocketerror)r   r   rets      r   recvfromzStringUDPSocket.recvfrom'   s<     lq!!c6<(( 	IDyr   N)r   r   r   r   )r   r   r   r   )r   r   r   r   )__name__
__module____qualname____doc__r   r   r%    r   r   r   r      sZ         1 1 1 1" " " "     r   r   c                  "    e Zd ZdZddZdd	Zd
S )	KeepReadsz%
    Accumulate reads in a list.
    r   r   c                    g | _         d S r   )reads)r   s    r   r   zKeepReads.__init__6   s    "$


r   databytesr   r   c                :    | j                             |           d S r   )r.   append)r   r/   r   s      r   datagramReceivedzKeepReads.datagramReceived9   s    
$r   Nr   r   )r/   r0   r   r   r   r   )r&   r'   r(   r)   r   r3   r*   r   r   r,   r,   1   sF         % % % %           r   r,   c                  2    e Zd ZdZd	dZd	dZd	dZd	dZdS )
ErrorsTestsz/
    Error handling tests for C{udp.Port}.
    r   r   c           	        t           j                            d           |                     t           j        j        d           t                      }t          j        d|          }t          ddt          j	        d          dt          j	        d          g          |_        |
                                 |                     |j        ddg           |
                                 |                     |j        g d           dS )z
        Socket reads with some good data followed by a socket error which can
        be ignored causes reading to stop, and no log messages to be logged.
        iN   result   123   456)r8   r9   r:   )r   _sockErrReadIgnorer2   
addCleanupremover,   Portr   r"   r#   doReadassertEqualr.   r   protocolports      r   test_socketReadNormalz!ErrorsTests.test_socketReadNormalB   s     	%%e,,,.5u===;;xh'' &U 3 3VV\%=P=PQ
 
 	)V)<===)D)D)DEEEEEr   c                (   t           j                            d           |                     t           j        j        d           t                      }d |_        t          j        d|          }t          dt          j
        d          dt          j
        t                    g          |_	        |                                 |                     |j        dg           |                                 |                     |j        ddg           dS )z
        If the socket is unconnected, socket reads with an immediate
        connection refusal are ignored, and reading stops. The protocol's
        C{connectionRefused} method is not called.
        c                     ddz  S )N   r   r*   r*   r   r   <lambda>z5ErrorsTests.test_readImmediateError.<locals>.<lambda>e   s
    QU r   N   a   b)r   _sockErrReadRefuser2   r<   r=   r,   connectionRefusedr>   r   r"   r#   r
   r?   r@   r.   rA   s      r   test_readImmediateErrorz#ErrorsTests.test_readImmediateErrorX   s     	%%e,,,.5u===;;%2]"xh'' &6<&&fl;.G.GH
 
 	$000$66666r   c                   t           j                            d           |                     t           j        j        d           t                      }g fd|_        t          j        d|          }t          dt          j
        d          dt          j
        t                    g          |_	        |                    dd           |                                 |                     |j        dg           |                     dg           |                                 |                     |j        ddg           |                     dg           dS )	z
        If the socket connected, socket reads with an immediate
        connection refusal are ignored, and reading stops. The protocol's
        C{connectionRefused} method is called.
        rF   c                 .                          d          S )NT)r2   )refuseds   r   rI   z>ErrorsTests.test_connectedReadImmediateError.<locals>.<lambda>   s    W^^D-A-A r   NrJ   rK   z	127.0.0.1i'  T)r   rL   r2   r<   r=   r,   rM   r>   r   r"   r#   r
   r   r?   r@   r.   )r   rB   rC   rQ   s      @r    test_connectedReadImmediateErrorz,ErrorsTests.test_connectedReadImmediateErrort   s7    	%%e,,,.5u===;;%A%A%A%A"xh''%6<&&fl;.G.GH
 
 	[$''' 	$0004&))) 	$6664&)))))r   c                   t                      }t          j        d|          }t          dt	          j        d          g          |_        |                     t          j        |j                   |                     |j	        dg           dS )zG
        Socket reads with an unknown socket error are raised.
        Ns   goodi)
r,   r   r>   r   r"   r#   assertRaisesr?   r@   r.   rA   s      r   test_readUnknownErrorz!ErrorsTests.test_readUnknownError   sx     ;;xh'' &wU0C0C&DEE&,444'33333r   Nr4   )r&   r'   r(   r)   rD   rN   rR   rU   r*   r   r   r6   r6   =   sr         F F F F,7 7 7 78* * * *>
4 
4 
4 
4 
4 
4r   r6   )r)   
__future__r   r"   twisted.internetr   twisted.internet.protocolr   twisted.python.runtimer   twisted.trialr   errnor	   r
   r   r,   SynchronousTestCaser6   r*   r   r   <module>r]      s:    # " " " " "              6 6 6 6 6 6 / / / / / / " " " " " "73333333!!!!!!       6	  	  	  	  	   	  	  	 `4 `4 `4 `4 `4(. `4 `4 `4 `4 `4r   