1
+ import json
2
+ import os
1
3
from collections import defaultdict
2
4
from dataclasses import (
3
5
dataclass ,
4
6
field ,
5
7
)
6
8
from enum import Enum
7
- import json
8
- import os
9
9
from pathlib import Path
10
10
from typing import (
11
11
TYPE_CHECKING ,
@@ -47,17 +47,20 @@ class ItemStatus(Enum):
47
47
FAILED = "failed"
48
48
SKIPPED = "skipped"
49
49
50
- class _MockPytestObject :
51
- def __init__ (self , collected_item ):
50
+
51
+ class _FakePytestObject :
52
+ def __init__ (self , collected_item : dict [str , str ]) -> None :
52
53
self .__module__ = collected_item ["modulename" ]
53
54
self .__name__ = collected_item ["methodname" ]
54
55
55
- class _MockPytestItem :
56
- def __init__ (self , collected_item : dict [str , Any ]) -> None :
56
+
57
+ class _FakePytestItem :
58
+ def __init__ (self , collected_item : dict [str , str ]) -> None :
57
59
self .nodeid = collected_item ["nodeid" ]
58
60
self .name = collected_item ["name" ]
59
61
self .path = Path (collected_item ["path" ])
60
- self .obj = _MockPytestObject (collected_item )
62
+ self .obj = _FakePytestObject (collected_item )
63
+
61
64
62
65
@dataclass
63
66
class SnapshotSession :
@@ -140,6 +143,24 @@ def ran_item(
140
143
except ValueError :
141
144
pass # if we don't understand the outcome, leave the item as "not run"
142
145
146
+ def _merge_collected_items (self , collected_items : list [dict [str , str ]]) -> None :
147
+ for collected_item in collected_items :
148
+ custom_item = _FakePytestItem (collected_item )
149
+ if not any (
150
+ t .nodeid == custom_item .nodeid and t .name == custom_item .nodeid
151
+ for t in self ._collected_items
152
+ ):
153
+ self ._collected_items .add (custom_item ) # type: ignore[arg-type]
154
+
155
+ def _merge_selected_items (self , selected_items : dict [str , str ]) -> None :
156
+ for key , selected_item in selected_items .items ():
157
+ if key in self ._selected_items :
158
+ status = ItemStatus (selected_item )
159
+ if status != ItemStatus .NOT_RUN :
160
+ self ._selected_items [key ] = status
161
+ else :
162
+ self ._selected_items [key ] = ItemStatus (selected_item )
163
+
143
164
def finish (self ) -> int :
144
165
exitstatus = 0
145
166
self .flush_snapshot_write_queue ()
@@ -152,12 +173,13 @@ def finish(self) -> int:
152
173
)
153
174
154
175
if is_xdist_worker ():
176
+ worker_count = os .getenv ("PYTEST_XDIST_WORKER_COUNT" )
177
+ with open (".pytest_syrupy_worker_count" , "w" , encoding = "utf-8" ) as f :
178
+ f .write (worker_count ) # type: ignore[arg-type]
155
179
with open (
156
- f"/workspaces/home-assistant-core/.syrupy.worker_count.txt" , "w"
157
- ) as f :
158
- f .write (os .getenv ("PYTEST_XDIST_WORKER_COUNT" ))
159
- with open (
160
- f"/workspaces/home-assistant-core/.syrupy.{ os .getenv ("PYTEST_XDIST_WORKER" )} .txt" , "w"
180
+ f".pytest_syrupy_{ os .getenv ("PYTEST_XDIST_WORKER" )} _result" ,
181
+ "w" ,
182
+ encoding = "utf-8" ,
161
183
) as f :
162
184
json .dump (self .report .serialize (), f , indent = 2 )
163
185
return exitstatus
@@ -166,31 +188,23 @@ def finish(self) -> int:
166
188
# Until this is implemented, running syrupy with pytest-xdist is only
167
189
# partially functional.
168
190
return exitstatus
169
-
191
+
170
192
worker_count = None
171
193
try :
172
- with open (f"/workspaces/home-assistant-core/.syrupy.worker_count.txt " , "r " ) as f :
194
+ with open (".pytest_syrupy_worker_count " , encoding = "utf-8 " ) as f :
173
195
worker_count = f .read ()
196
+ os .remove (".pytest_syrupy_worker_count" )
174
197
except FileNotFoundError :
175
198
pass
176
199
177
200
if worker_count :
178
201
for i in range (int (worker_count )):
179
- with open (f"/workspaces/home-assistant-core/.syrupy.gw { i } .txt " , "r " ) as f :
202
+ with open (f".pytest_syrupy_gw { i } _result " , encoding = "utf-8 " ) as f :
180
203
data = json .load (f )
181
- for collected_item in data ["_collected_items" ]:
182
- custom_item = _MockPytestItem (collected_item )
183
- if not any (t .nodeid == custom_item .nodeid and t .name == custom_item .nodeid for t in self ._collected_items ):
184
- self ._collected_items .add (custom_item )
185
- for key , selected_item in data ["_selected_items" ].items ():
186
- if key in self ._selected_items :
187
- status = ItemStatus (selected_item )
188
- if status != ItemStatus .NOT_RUN :
189
- self ._selected_items [key ] = status
190
- else :
191
- self ._selected_items [key ] = ItemStatus (selected_item )
192
-
204
+ self ._merge_collected_items (data ["_collected_items" ])
205
+ self ._merge_selected_items (data ["_selected_items" ])
193
206
self .report .merge_serialized (data )
207
+ os .remove (f".pytest_syrupy_gw{ i } _result" )
194
208
195
209
if self .report .num_unused :
196
210
if self .update_snapshots :
0 commit comments