Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/zeroconf/_listener.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ cdef bint TYPE_CHECKING
cdef cython.uint _MAX_MSG_ABSOLUTE
cdef cython.uint _DUPLICATE_PACKET_SUPPRESSION_INTERVAL
cdef cython.uint _RECENT_PACKETS_MAX
cdef cython.uint _MAX_DEFERRED_ADDRS
cdef cython.uint _MAX_DEFERRED_PER_ADDR


cdef class AsyncListener:
Expand All @@ -41,6 +43,8 @@ cdef class AsyncListener:

cdef _cancel_any_timers_for_addr(self, object addr)

cdef _evict_oldest_deferred(self)

@cython.locals(deadline=object, fire_at=double)
cdef double _compute_deferred_fire_at(self, object addr, double now, double delay)

Expand Down
33 changes: 32 additions & 1 deletion src/zeroconf/_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@
from ._protocol.incoming import DNSIncoming
from ._transport import _WrappedTransport, make_wrapped_transport
from ._utils.time import current_time_millis, millis_to_seconds
from .const import _DUPLICATE_PACKET_SUPPRESSION_INTERVAL, _MAX_MSG_ABSOLUTE, _RECENT_PACKETS_MAX
from .const import (
_DUPLICATE_PACKET_SUPPRESSION_INTERVAL,
_MAX_DEFERRED_ADDRS,
_MAX_DEFERRED_PER_ADDR,
_MAX_MSG_ABSOLUTE,
_RECENT_PACKETS_MAX,
)

if TYPE_CHECKING:
from ._core import Zeroconf
Expand Down Expand Up @@ -240,7 +246,17 @@ def handle_query_or_defer(
self._respond_query(msg, addr, port, transport, v6_flow_scope)
return

if addr not in self._deferred and len(self._deferred) >= _MAX_DEFERRED_ADDRS:
# Bound total deferred addrs so a spoofed-source flood
# cannot keep adding distinct entries; evict the oldest
# (insertion-order) entry and discard its in-flight queue.
self._evict_oldest_deferred()

deferred = self._deferred.setdefault(addr, [])
if len(deferred) >= _MAX_DEFERRED_PER_ADDR:
# Bound per-addr queue length; further fragments from the
# same source are dropped until the timer flushes.
return
# If we get the same packet we ignore it
for incoming in reversed(deferred):
if incoming.data == msg.data:
Expand Down Expand Up @@ -293,6 +309,21 @@ def _cancel_any_timers_for_addr(self, addr: _str) -> None:
if addr in self._timers:
self._timers.pop(addr).cancel()

def _evict_oldest_deferred(self) -> None:
"""Discard the oldest deferred addr's reassembly state.

Used when ``_MAX_DEFERRED_ADDRS`` would be exceeded; the
evicted addr's queue and timer are dropped without firing, so
the bound holds even when an attacker rotates source IPs.
Eviction is FIFO (oldest by first-seen, via dict insertion
order) rather than LRU so an active flooder cannot pin its
slots by re-sending into the same addr.
"""
oldest_addr = next(iter(self._deferred))
self._cancel_any_timers_for_addr(oldest_addr)
self._deferred_deadlines.pop(oldest_addr, None)
del self._deferred[oldest_addr]

def _respond_query(
self,
msg: DNSIncoming | None,
Expand Down
14 changes: 14 additions & 0 deletions src/zeroconf/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@
# flooding distinct questions (RFC 6762 §7.3, defense-in-depth).
_MAX_QUESTION_HISTORY_ENTRIES = 10000

# Per-addr cap on the number of truncated (TC-bit) packets retained for
# RFC 6762 §18.5 reassembly. The spec anticipates only a handful of
# segments per truncated query; 16 is well above legitimate need and
# keeps the per-arrival dedup scan a constant-time cost under a flood.
_MAX_DEFERRED_PER_ADDR = 16

# Per-listener cap on the number of distinct addrs with in-flight
# TC-deferral state. Each entry can hold up to _MAX_DEFERRED_PER_ADDR
# packets of up to _MAX_MSG_ABSOLUTE bytes; 512 leaves headroom for a
# legitimate burst (LAN-wide power-resume / boot storm where many
# devices announce at once) while bounding worst-case memory at
# ~72 MB even when a peer floods with spoofed source IPs.
_MAX_DEFERRED_ADDRS = 512
Comment thread
bdraco marked this conversation as resolved.

_DNS_PACKET_HEADER_LEN = 12

_MAX_MSG_TYPICAL = 1460 # unused
Expand Down
97 changes: 96 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pytest

import zeroconf as r
from zeroconf import NotRunningException, Zeroconf, const, current_time_millis
from zeroconf import NotRunningException, Zeroconf, _listener, const, current_time_millis
from zeroconf._listener import _TC_DELAY_RANDOM_INTERVAL, AsyncListener, _WrappedTransport
from zeroconf._protocol.incoming import DNSIncoming
from zeroconf.asyncio import AsyncZeroconf
Expand Down Expand Up @@ -794,6 +794,101 @@ def test_tc_bit_defer_window_is_bounded():
zc.close()


def _make_distinct_tc_packets(count: int, name_prefix: str = "q") -> list[bytes]:
"""Generate ``count`` byte-distinct TC-flagged query packets for flood inputs."""
packets = []
for i in range(count):
out = r.DNSOutgoing(const._FLAGS_QR_QUERY | const._FLAGS_TC)
out.add_question(r.DNSQuestion(f"{name_prefix}{i}._tcp.local.", const._TYPE_PTR, const._CLASS_IN))
packets.append(out.packets()[0])
return packets


def _synthetic_source_ip(i: int) -> str:
"""Distinct synthetic source IPs from the documentation ranges."""
if i < 256:
return f"203.0.113.{i}"
if i < 512:
return f"198.51.100.{i - 256}"
return f"192.0.2.{i - 512}"


def test_tc_bit_per_addr_queue_is_bounded(quick_timing: None) -> None:
"""Per-addr deferred queue must not grow past ``_MAX_DEFERRED_PER_ADDR``."""
zc = Zeroconf(interfaces=["127.0.0.1"])
_wait_for_start(zc)
protocol = zc.engine.protocols[0]
source_ip = "203.0.113.21"

extra = 4
packets = _make_distinct_tc_packets(const._MAX_DEFERRED_PER_ADDR + extra)

# Push the reassembly timer well past any possible test runtime
# so the bound under test is the only thing that can drop entries.
with patch.object(_listener, "_TC_DELAY_RANDOM_INTERVAL", (60_000, 60_001)):
for raw in packets:
threadsafe_query(zc, protocol, r.DNSIncoming(raw), source_ip, const._MDNS_PORT, Mock(), ())

assert len(protocol._deferred[source_ip]) == const._MAX_DEFERRED_PER_ADDR
# Last ``extra`` packets must have been dropped, not displaced; the
# earlier ``_MAX_DEFERRED_PER_ADDR`` entries are the ones retained.
retained = [incoming.data for incoming in protocol._deferred[source_ip]]
assert retained == packets[: const._MAX_DEFERRED_PER_ADDR]

zc.close()


def test_tc_bit_total_addrs_is_bounded(quick_timing: None) -> None:
"""Distinct addrs with deferred state must not exceed ``_MAX_DEFERRED_ADDRS``."""
zc = Zeroconf(interfaces=["127.0.0.1"])
_wait_for_start(zc)
protocol = zc.engine.protocols[0]

raw = _make_distinct_tc_packets(1)[0]
extra = 4
addrs = [_synthetic_source_ip(i) for i in range(const._MAX_DEFERRED_ADDRS + extra)]

# Push the reassembly timer well past any possible test runtime
# so the bound under test is the only thing that can drop entries;
# without this, PyPy / slow runners can fire timers between the
# last enqueue and the assertion.
with patch.object(_listener, "_TC_DELAY_RANDOM_INTERVAL", (60_000, 60_001)):
for source_ip in addrs:
threadsafe_query(zc, protocol, r.DNSIncoming(raw), source_ip, const._MDNS_PORT, Mock(), ())

assert len(protocol._deferred) == const._MAX_DEFERRED_ADDRS
assert len(protocol._timers) == const._MAX_DEFERRED_ADDRS

zc.close()


def test_tc_bit_eviction_drops_oldest_addr(quick_timing: None) -> None:
"""Adding a new addr at capacity drops the oldest insertion (FIFO)."""
zc = Zeroconf(interfaces=["127.0.0.1"])
_wait_for_start(zc)
protocol = zc.engine.protocols[0]

raw = _make_distinct_tc_packets(1)[0]
fillers = [_synthetic_source_ip(i) for i in range(const._MAX_DEFERRED_ADDRS)]
new_addr = _synthetic_source_ip(const._MAX_DEFERRED_ADDRS)
oldest = fillers[0]

with patch.object(_listener, "_TC_DELAY_RANDOM_INTERVAL", (60_000, 60_001)):
for source_ip in fillers:
threadsafe_query(zc, protocol, r.DNSIncoming(raw), source_ip, const._MDNS_PORT, Mock(), ())
assert len(protocol._deferred) == const._MAX_DEFERRED_ADDRS
assert oldest in protocol._deferred

# One more distinct addr must evict the oldest insertion-order entry.
threadsafe_query(zc, protocol, r.DNSIncoming(raw), new_addr, const._MDNS_PORT, Mock(), ())
assert oldest not in protocol._deferred
assert oldest not in protocol._timers
assert new_addr in protocol._deferred
assert len(protocol._deferred) == const._MAX_DEFERRED_ADDRS

zc.close()
Comment thread
bdraco marked this conversation as resolved.


@pytest.mark.asyncio
async def test_open_close_twice_from_async() -> None:
"""Test we can close twice from a coroutine when using Zeroconf.
Expand Down
Loading