--- a/src/zeroconf/_listener.pxd +++ b/src/zeroconf/_listener.pxd @@ -11,6 +11,8 @@ cdef object log cdef object DEBUG_ENABLED cdef bint TYPE_CHECKING +cdef cython.uint _MAX_DEFERRED_ADDRS +cdef cython.uint _MAX_DEFERRED_PER_ADDR cdef cython.uint _MAX_MSG_ABSOLUTE cdef cython.uint _DUPLICATE_PACKET_SUPPRESSION_INTERVAL @@ -38,6 +40,8 @@ cdef _cancel_any_timers_for_addr(self, object addr) + cdef _evict_oldest_deferred(self) + @cython.locals(incoming=DNSIncoming, deferred=list) cpdef handle_query_or_defer( self, --- a/src/zeroconf/_listener.py +++ b/src/zeroconf/_listener.py @@ -32,7 +32,12 @@ from ._protocol.incoming import DNSIncoming from ._transport import _WrappedTransport, make_wrapped_transport from ._utils.time import current_time_millis, millis_to_seconds -from .const import _DUPLICATE_PACKET_SUPPRESSION_INTERVAL, _MAX_MSG_ABSOLUTE +from .const import ( + _DUPLICATE_PACKET_SUPPRESSION_INTERVAL, + _MAX_DEFERRED_ADDRS, + _MAX_DEFERRED_PER_ADDR, + _MAX_MSG_ABSOLUTE, +) if TYPE_CHECKING: from ._core import Zeroconf @@ -197,7 +202,17 @@ self._respond_query(msg, addr, port, transport, v6_flow_scope) return + if addr not in self._deferred and len(self._deferred) >= _MAX_DEFERRED_ADDRS: + # Bound total deferred addrs so a spoofed-source flood + # cannot keep adding distinct entries; evict the oldest + # (insertion-order) entry and discard its in-flight queue. + self._evict_oldest_deferred() + deferred = self._deferred.setdefault(addr, []) + if len(deferred) >= _MAX_DEFERRED_PER_ADDR: + # Bound per-addr queue length; further fragments from the + # same source are dropped until the timer flushes. + return # If we get the same packet we ignore it for incoming in reversed(deferred): if incoming.data == msg.data: @@ -222,6 +237,21 @@ if addr in self._timers: self._timers.pop(addr).cancel() + def _evict_oldest_deferred(self) -> None: + """Discard the oldest deferred addr's reassembly state. + + Used when ``_MAX_DEFERRED_ADDRS`` would be exceeded; the + evicted addr's queue and timer are dropped without firing, so + the bound holds even when an attacker rotates source IPs. + Eviction is FIFO (oldest by first-seen, via dict insertion + order) rather than LRU so an active flooder cannot pin its + slots by re-sending into the same addr. + """ + oldest_addr = next(iter(self._deferred)) + self._cancel_any_timers_for_addr(oldest_addr) + self._deferred_deadlines.pop(oldest_addr, None) + del self._deferred[oldest_addr] + def _respond_query( self, msg: DNSIncoming | None, --- a/src/zeroconf/const.py +++ b/src/zeroconf/const.py @@ -65,6 +65,20 @@ # to retain by multicasting many unique-name records. _MAX_CACHE_RECORDS = 10000 +# Per-addr cap on the number of truncated (TC-bit) packets retained for +# RFC 6762 ยง18.5 reassembly. The spec anticipates only a handful of +# segments per truncated query; 16 is well above legitimate need and +# keeps the per-arrival dedup scan a constant-time cost under a flood. +_MAX_DEFERRED_PER_ADDR = 16 + +# Per-listener cap on the number of distinct addrs with in-flight +# TC-deferral state. Each entry can hold up to _MAX_DEFERRED_PER_ADDR +# packets of up to _MAX_MSG_ABSOLUTE bytes; 512 leaves headroom for a +# legitimate burst (LAN-wide power-resume / boot storm where many +# devices announce at once) while bounding worst-case memory at +# ~72 MB even when a peer floods with spoofed source IPs. +_MAX_DEFERRED_ADDRS = 512 + _DNS_PACKET_HEADER_LEN = 12 _MAX_MSG_TYPICAL = 1460 # unused --- a/tests/test_core.py +++ b/tests/test_core.py @@ -18,7 +18,7 @@ import pytest import zeroconf as r -from zeroconf import NotRunningException, Zeroconf, const, current_time_millis +from zeroconf import NotRunningException, Zeroconf, _listener, const, current_time_millis from zeroconf._listener import AsyncListener, _WrappedTransport from zeroconf._protocol.incoming import DNSIncoming from zeroconf.asyncio import AsyncZeroconf @@ -676,6 +676,101 @@ zc.close() +def _make_distinct_tc_packets(count: int, name_prefix: str = "q") -> list[bytes]: + """Generate ``count`` byte-distinct TC-flagged query packets for flood inputs.""" + packets = [] + for i in range(count): + out = r.DNSOutgoing(const._FLAGS_QR_QUERY | const._FLAGS_TC) + out.add_question(r.DNSQuestion(f"{name_prefix}{i}._tcp.local.", const._TYPE_PTR, const._CLASS_IN)) + packets.append(out.packets()[0]) + return packets + + +def _synthetic_source_ip(i: int) -> str: + """Distinct synthetic source IPs from the documentation ranges.""" + if i < 256: + return f"203.0.113.{i}" + if i < 512: + return f"198.51.100.{i - 256}" + return f"192.0.2.{i - 512}" + + +def test_tc_bit_per_addr_queue_is_bounded(quick_timing: None) -> None: + """Per-addr deferred queue must not grow past ``_MAX_DEFERRED_PER_ADDR``.""" + zc = Zeroconf(interfaces=["127.0.0.1"]) + _wait_for_start(zc) + protocol = zc.engine.protocols[0] + source_ip = "203.0.113.21" + + extra = 4 + packets = _make_distinct_tc_packets(const._MAX_DEFERRED_PER_ADDR + extra) + + # Push the reassembly timer well past any possible test runtime + # so the bound under test is the only thing that can drop entries. + with patch.object(_listener, "_TC_DELAY_RANDOM_INTERVAL", (60_000, 60_001)): + for raw in packets: + threadsafe_query(zc, protocol, r.DNSIncoming(raw), source_ip, const._MDNS_PORT, Mock(), ()) + + assert len(protocol._deferred[source_ip]) == const._MAX_DEFERRED_PER_ADDR + # Last ``extra`` packets must have been dropped, not displaced; the + # earlier ``_MAX_DEFERRED_PER_ADDR`` entries are the ones retained. + retained = [incoming.data for incoming in protocol._deferred[source_ip]] + assert retained == packets[: const._MAX_DEFERRED_PER_ADDR] + + zc.close() + + +def test_tc_bit_total_addrs_is_bounded(quick_timing: None) -> None: + """Distinct addrs with deferred state must not exceed ``_MAX_DEFERRED_ADDRS``.""" + zc = Zeroconf(interfaces=["127.0.0.1"]) + _wait_for_start(zc) + protocol = zc.engine.protocols[0] + + raw = _make_distinct_tc_packets(1)[0] + extra = 4 + addrs = [_synthetic_source_ip(i) for i in range(const._MAX_DEFERRED_ADDRS + extra)] + + # Push the reassembly timer well past any possible test runtime + # so the bound under test is the only thing that can drop entries; + # without this, PyPy / slow runners can fire timers between the + # last enqueue and the assertion. + with patch.object(_listener, "_TC_DELAY_RANDOM_INTERVAL", (60_000, 60_001)): + for source_ip in addrs: + threadsafe_query(zc, protocol, r.DNSIncoming(raw), source_ip, const._MDNS_PORT, Mock(), ()) + + assert len(protocol._deferred) == const._MAX_DEFERRED_ADDRS + assert len(protocol._timers) == const._MAX_DEFERRED_ADDRS + + zc.close() + + +def test_tc_bit_eviction_drops_oldest_addr(quick_timing: None) -> None: + """Adding a new addr at capacity drops the oldest insertion (FIFO).""" + zc = Zeroconf(interfaces=["127.0.0.1"]) + _wait_for_start(zc) + protocol = zc.engine.protocols[0] + + raw = _make_distinct_tc_packets(1)[0] + fillers = [_synthetic_source_ip(i) for i in range(const._MAX_DEFERRED_ADDRS)] + new_addr = _synthetic_source_ip(const._MAX_DEFERRED_ADDRS) + oldest = fillers[0] + + with patch.object(_listener, "_TC_DELAY_RANDOM_INTERVAL", (60_000, 60_001)): + for source_ip in fillers: + threadsafe_query(zc, protocol, r.DNSIncoming(raw), source_ip, const._MDNS_PORT, Mock(), ()) + assert len(protocol._deferred) == const._MAX_DEFERRED_ADDRS + assert oldest in protocol._deferred + + # One more distinct addr must evict the oldest insertion-order entry. + threadsafe_query(zc, protocol, r.DNSIncoming(raw), new_addr, const._MDNS_PORT, Mock(), ()) + assert oldest not in protocol._deferred + assert oldest not in protocol._timers + assert new_addr in protocol._deferred + assert len(protocol._deferred) == const._MAX_DEFERRED_ADDRS + + zc.close() + + @pytest.mark.asyncio async def test_open_close_twice_from_async() -> None: """Test we can close twice from a coroutine when using Zeroconf.