Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 66 additions & 75 deletions tests/services/test_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,97 +522,88 @@ def test_get_info_single(self):
service_address = "10.0.1.2"

service_info = None
send_event = Event()
service_info_event = Event()

last_sent: r.DNSOutgoing | None = None
ttl = 120
response_records = [
r.DNSText(
service_name,
const._TYPE_TXT,
const._CLASS_IN | const._CLASS_UNIQUE,
ttl,
service_text,
),
r.DNSService(
service_name,
const._TYPE_SRV,
const._CLASS_IN | const._CLASS_UNIQUE,
ttl,
0,
0,
80,
service_server,
),
r.DNSAddress(
service_server,
const._TYPE_A,
const._CLASS_IN | const._CLASS_UNIQUE,
ttl,
socket.inet_pton(socket.AF_INET, service_address),
),
]

def send(out, addr=const._MDNS_ADDR, port=const._MDNS_PORT, v6_flow_scope=()):
"""Sends an outgoing packet."""
nonlocal last_sent
def mock_incoming_msg(records: Iterable[r.DNSRecord]) -> r.DNSIncoming:
generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE)
for record in records:
generated.add_answer_at_time(record, 0)
return r.DNSIncoming(generated.packets()[0])

last_sent = out
send_event.set()
sent_queries: list[r.DNSOutgoing] = []

def send(out, addr=const._MDNS_ADDR, port=const._MDNS_PORT, v6_flow_scope=()):
"""Capture each query and, on the first one, fill the cache
inline so the next iteration of `async_request` finds
`_is_complete=True` and exits without sending another query.

Running the inject from inside `send` keeps it on the event
loop thread and atomic with the first send — eliminating the
test-thread → `run_coroutine_threadsafe` race that flaked
under PyPy + use_cython when `quick_request_timing` shortens
the inter-iteration delay to ~15ms.
"""
sent_queries.append(out)
if len(sent_queries) == 1:
zc.record_manager.async_updates_from_response(mock_incoming_msg(response_records))

def get_service_info_helper(zc, type, name):
nonlocal service_info
service_info = zc.get_service_info(type, name)
service_info_event.set()

# patch the zeroconf send
with patch.object(zc, "async_send", send):

def mock_incoming_msg(records: Iterable[r.DNSRecord]) -> r.DNSIncoming:
generated = r.DNSOutgoing(const._FLAGS_QR_RESPONSE)

for record in records:
generated.add_answer_at_time(record, 0)

return r.DNSIncoming(generated.packets()[0])

def get_service_info_helper(zc, type, name):
nonlocal service_info
service_info = zc.get_service_info(type, name)
service_info_event.set()

try:
ttl = 120
helper_thread = threading.Thread(
target=get_service_info_helper,
args=(zc, service_type, service_name),
)
helper_thread.start()
# Positive wait — the first query fires within
# `_LISTENER_TIME` + jitter (~15ms under
# `quick_request_timing`, ~320ms without).
wait_time = 1

# Expect query for SRV, TXT, A, AAAA
send_event.wait(wait_time)
assert last_sent is not None
assert len(last_sent.questions) == 4
assert r.DNSQuestion(service_name, const._TYPE_SRV, const._CLASS_IN) in last_sent.questions
assert r.DNSQuestion(service_name, const._TYPE_TXT, const._CLASS_IN) in last_sent.questions
assert r.DNSQuestion(service_name, const._TYPE_A, const._CLASS_IN) in last_sent.questions
assert r.DNSQuestion(service_name, const._TYPE_AAAA, const._CLASS_IN) in last_sent.questions
assert service_info is None

# Expect no further queries — under `quick_request_timing`
# the next query would have fired ~15ms after the previous
# send, so 100ms is plenty of headroom for the negative
# assertion.
last_sent = None
send_event.clear()
_inject_response(
zc,
mock_incoming_msg(
[
r.DNSText(
service_name,
const._TYPE_TXT,
const._CLASS_IN | const._CLASS_UNIQUE,
ttl,
service_text,
),
r.DNSService(
service_name,
const._TYPE_SRV,
const._CLASS_IN | const._CLASS_UNIQUE,
ttl,
0,
0,
80,
service_server,
),
r.DNSAddress(
service_server,
const._TYPE_A,
const._CLASS_IN | const._CLASS_UNIQUE,
ttl,
socket.inet_pton(socket.AF_INET, service_address),
),
]
),
)
send_event.wait(0.1)
assert last_sent is None
# Helper should complete promptly — the inline inject in
# `send` populates the cache before the request loop's
# next iteration.
service_info_event.wait(1)
assert service_info is not None

# First (and only) query: QU for SRV/TXT/A/AAAA.
assert len(sent_queries) == 1
first_sent = sent_queries[0]
assert len(first_sent.questions) == 4
assert r.DNSQuestion(service_name, const._TYPE_SRV, const._CLASS_IN) in first_sent.questions
assert r.DNSQuestion(service_name, const._TYPE_TXT, const._CLASS_IN) in first_sent.questions
assert r.DNSQuestion(service_name, const._TYPE_A, const._CLASS_IN) in first_sent.questions
assert r.DNSQuestion(service_name, const._TYPE_AAAA, const._CLASS_IN) in first_sent.questions

finally:
helper_thread.join()
zc.remove_all_service_listeners()
Expand Down
Loading