diff --git a/src/DynamicData.Tests/Cache/CrossCacheDeadlockStressTest.cs b/src/DynamicData.Tests/Cache/CrossCacheDeadlockStressTest.cs
new file mode 100644
index 00000000..110dcd5c
--- /dev/null
+++ b/src/DynamicData.Tests/Cache/CrossCacheDeadlockStressTest.cs
@@ -0,0 +1,869 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Reactive.Subjects;
+using System.Reactive.Threading.Tasks;
+using System.Threading;
+using System.Threading.Tasks;
+
+using Bogus;
+
+using DynamicData.Binding;
+using DynamicData.Kernel;
+
+using FluentAssertions;
+
+using Xunit;
+
+namespace DynamicData.Tests.Cache;
+
+///
+/// Mega cross-cache stress test exercising every operator migrated from
+/// Synchronize to SynchronizeSafe in multi-threaded bidirectional pipelines.
+/// Every numeric parameter is derived from a seeded Randomizer (deterministic
+/// but not hardcoded). Proves: no deadlocks, correct final state, Rx compliance.
+///
+public sealed class CrossCacheDeadlockStressTest
+{
+ // ════════════════════════════════════════════════════════════════
+ // Bound constants — ONLY the seed and Min/Max bounds are hardcoded.
+ // Every actual test value is derived from the seeded Randomizer.
+ // ════════════════════════════════════════════════════════════════
+
+ private const int Seed = 42;
+
+ // Market counts
+ private const int SourceMarketCountMin = 80;
+ private const int SourceMarketCountMax = 120;
+ private const int OverlappingCountMin = 5;
+ private const int OverlappingCountMax = 15;
+ private const int TreeMarketCountMin = 12;
+ private const int TreeMarketCountMax = 25;
+
+ // Per-market price generation
+ private const int PricesPerMarketMin = 2;
+ private const int PricesPerMarketMax = 8;
+
+ // Market property ranges
+ private const int PriorityMin = 1;
+ private const int PriorityMax = 10;
+ private const double RatingMin = 1.0;
+ private const double RatingMax = 10.0;
+ private const int RegionCountMin = 3;
+ private const int RegionCountMax = 7;
+
+ // Price ranges
+ private const decimal PriceMin = 1.0m;
+ private const decimal PriceMax = 500.0m;
+
+ // Pipeline parameters
+ private const double RatingFilterThresholdMin = 2.0;
+ private const double RatingFilterThresholdMax = 5.0;
+ private const double TransformMultiplierMin = 1.5;
+ private const double TransformMultiplierMax = 3.0;
+ private const int PageSizeMin = 20;
+ private const int PageSizeMax = 60;
+ private const int VirtualSizeMin = 15;
+ private const int VirtualSizeMax = 40;
+
+ // Stress parameters
+ private const int WriterThreadCountMin = 2;
+ private const int WriterThreadCountMax = 6;
+ private const int RatingMutationsMin = 10;
+ private const int RatingMutationsMax = 30;
+ private const int RegionMutationsMin = 5;
+ private const int RegionMutationsMax = 15;
+
+ // ID range spacing (generous gaps to prevent overlap)
+ private const int IdRangeSpacing = 10_000;
+
+ // Timeout
+ private const int TimeoutSecondsMin = 30;
+ private const int TimeoutSecondsMax = 60;
+
+ // ════════════════════════════════════════════════════════════════
+ // Domain types
+ // ════════════════════════════════════════════════════════════════
+
+ private sealed class StressMarket : AbstractNotifyPropertyChanged, IDisposable
+ {
+ private double _rating;
+ private string _region;
+
+ public StressMarket(int id, string name, string region, int priority, double rating, int? parentId = null)
+ {
+ Id = id;
+ Name = name;
+ _region = region;
+ Priority = priority;
+ _rating = rating;
+ ParentId = parentId;
+ Prices = new SourceCache(p => p.Id);
+ }
+
+ public int Id { get; }
+
+ public string Name { get; }
+
+ public string Region
+ {
+ get => _region;
+ set => SetAndRaise(ref _region, value);
+ }
+
+ public int Priority { get; }
+
+ public double Rating
+ {
+ get => _rating;
+ set => SetAndRaise(ref _rating, value);
+ }
+
+ public int? ParentId { get; }
+
+ public SourceCache Prices { get; }
+
+ public IObservable> LatestPrices => Prices.Connect();
+
+ public void Dispose() => Prices.Dispose();
+
+ public override string ToString() => $"Market({Id}, {Name}, R={Rating:F1}, P={Priority})";
+ }
+
+ private sealed class StressPrice(int id, int marketId, decimal price)
+ {
+ public int Id { get; } = id;
+
+ public int MarketId { get; } = marketId;
+
+ public decimal Price { get; set; } = price;
+
+ public override string ToString() => $"Price({Id}, M={MarketId}, ${Price:F2})";
+ }
+
+ private sealed class RatingDescComparer : IComparer
+ {
+ public static RatingDescComparer Instance { get; } = new();
+
+ public int Compare(StressMarket? x, StressMarket? y) =>
+ (y?.Rating ?? 0).CompareTo(x?.Rating ?? 0);
+ }
+
+ private sealed class PriorityAscComparer : IComparer
+ {
+ public static PriorityAscComparer Instance { get; } = new();
+
+ public int Compare(StressMarket? x, StressMarket? y) =>
+ (x?.Priority ?? 0).CompareTo(y?.Priority ?? 0);
+ }
+
+ private sealed class PriceDescComparer : IComparer
+ {
+ public static PriceDescComparer Instance { get; } = new();
+
+ public int Compare(StressPrice? x, StressPrice? y) =>
+ (y?.Price ?? 0).CompareTo(x?.Price ?? 0);
+ }
+
+ // ════════════════════════════════════════════════════════════════
+ // The Test
+ // ════════════════════════════════════════════════════════════════
+
+ [Fact]
+ public async Task AllOperators_CrossCache_NoDeadlock_CorrectResults()
+ {
+ // ── Derive ALL test parameters from seeded Randomizer ────────
+ var rand = new Randomizer(Seed);
+
+ var sourceACount = rand.Number(SourceMarketCountMin, SourceMarketCountMax);
+ var sourceBCount = rand.Number(SourceMarketCountMin, SourceMarketCountMax);
+ var overlappingCount = rand.Number(OverlappingCountMin, OverlappingCountMax);
+ var treeCount = rand.Number(TreeMarketCountMin, TreeMarketCountMax);
+ var regionCount = rand.Number(RegionCountMin, RegionCountMax);
+ var regions = Enumerable.Range(0, regionCount).Select(i => $"Region-{i}").ToArray();
+ var ratingThreshold = rand.Double(RatingFilterThresholdMin, RatingFilterThresholdMax);
+ var transformMultiplier = rand.Double(TransformMultiplierMin, TransformMultiplierMax);
+ var pageSize = rand.Number(PageSizeMin, PageSizeMax);
+ var virtualSize = rand.Number(VirtualSizeMin, VirtualSizeMax);
+ var writerThreads = rand.Number(WriterThreadCountMin, WriterThreadCountMax);
+ var ratingMutations = rand.Number(RatingMutationsMin, RatingMutationsMax);
+ var regionMutations = rand.Number(RegionMutationsMin, RegionMutationsMax);
+ var timeoutSeconds = rand.Number(TimeoutSecondsMin, TimeoutSecondsMax);
+
+ // ID ranges (non-overlapping, derived from spacing)
+ var idA = rand.Number(1, IdRangeSpacing / 2);
+ var idB = idA + IdRangeSpacing;
+ var idOverlap = idB + IdRangeSpacing;
+ var idForward = idOverlap + IdRangeSpacing;
+ var idReverse = idForward + IdRangeSpacing;
+ var idTree = idReverse + IdRangeSpacing;
+
+ // ── Data Generation ─────────────────────────────────────────
+ var marketsA = GenerateMarkets(rand, idA, sourceACount, regions);
+ var marketsB = GenerateMarkets(rand, idB, sourceBCount, regions);
+ var overlapping = GenerateMarkets(rand, idOverlap, overlappingCount, regions);
+ var treeMarkets = GenerateTreeMarkets(rand, idTree, treeCount, regions);
+
+ // ── Source Caches ───────────────────────────────────────────
+ using var sourceA = new SourceCache(m => m.Id);
+ using var sourceB = new SourceCache(m => m.Id);
+ using var treeSource = new SourceCache(m => m.Id);
+
+ // ── Subjects for dynamic parameters ─────────────────────────
+ using var pageRequests = new BehaviorSubject(new PageRequest(1, pageSize));
+ using var virtualRequests = new BehaviorSubject(new VirtualRequest(0, virtualSize));
+ using var pauseBatch = new BehaviorSubject(false);
+ using var forceTransform = new Subject>();
+ using var switchSource = new BehaviorSubject>>(sourceA.Connect());
+ using var comparerSubject = new BehaviorSubject>(RatingDescComparer.Instance);
+
+ // Stop signal for operators with a library gap — they don't forward OnCompleted:
+ // Static Combiner (Or/And/Except), BatchIf, TransformToTree, Switch
+ using var stopSignal = new Subject();
+
+ // ── Completion tracking ─────────────────────────────────────
+ var completionTasks = new List();
+ var completionNames = new List();
+ using var subs = new CompositeDisposable();
+
+ // Helpers
+ IObservableCache TrackCache(IObservable> pipeline, [System.Runtime.CompilerServices.CallerArgumentExpression(nameof(pipeline))] string? name = null)
+ where TObj : notnull where TKey : notnull
+ {
+ var pub = pipeline.Publish();
+ completionTasks.Add(pub.LastOrDefaultAsync().ToTask());
+ completionNames.Add(name ?? "?");
+ var cache = pub.AsObservableCache();
+ subs.Add(cache);
+ subs.Add(pub.Connect());
+ return cache;
+ }
+
+ // Bidirectional flows need writable SourceCaches
+ using var forwardTarget = new SourceCache(m => m.Id);
+ using var reverseTarget = new SourceCache(m => m.Id);
+
+ void TrackIntoCache(IObservable> pipeline, SourceCache target, [System.Runtime.CompilerServices.CallerArgumentExpression(nameof(pipeline))] string? name = null)
+ {
+ var pub = pipeline.Publish();
+ completionTasks.Add(pub.LastOrDefaultAsync().ToTask());
+ completionNames.Add(name ?? "?");
+ subs.Add(pub.PopulateInto(target));
+ subs.Add(pub.Connect());
+ }
+
+ // ── Auto-dispose items removed from source caches ───────────
+ subs.Add(sourceA.Connect().DisposeMany().Subscribe());
+ subs.Add(sourceB.Connect().DisposeMany().Subscribe());
+
+ // ════════════════════════════════════════════════════════════
+ // FLOW 1 — Forward Bidirectional: sourceA → forwardTarget → sourceB
+ // Operators: AutoRefresh, Filter(dynamic), Transform(forceTransform),
+ // OnItemRemoved, DisposeMany, Sort, Page, BatchIf
+ // ════════════════════════════════════════════════════════════
+
+ var forwardRemovals = 0;
+ var forwardIdCounter = idForward;
+
+ TrackIntoCache(
+ sourceA.Connect()
+ .AutoRefresh(m => m.Rating) // AutoRefresh [1]
+ .Filter(m => m.Id >= idA && m.Id < idA + sourceACount // Filter(dynamic) [1]
+ && m.Rating >= ratingThreshold)
+ .Transform( // Transform(forceTransform) [1]
+ m => new StressMarket(
+ Interlocked.Increment(ref forwardIdCounter),
+ $"F-{m.Name}", m.Region, m.Priority,
+ m.Rating * transformMultiplier),
+ forceTransform)
+ .OnItemRemoved(m => // OnItemRemoved [1]
+ Interlocked.Increment(ref forwardRemovals))
+ .DisposeMany() // DisposeMany [1]
+ .Sort(RatingDescComparer.Instance) // Sort [1]
+ .Page(pageRequests) // Page [1]
+ .BatchIf(pauseBatch, false, (TimeSpan?)null) // BatchIf [1]
+ .TakeUntil(stopSignal),
+ forwardTarget);
+
+ subs.Add(forwardTarget.Connect().PopulateInto(sourceB));
+
+ // ════════════════════════════════════════════════════════════
+ // FLOW 2 — Reverse Bidirectional: sourceB → reverseTarget → sourceA
+ // Operators: AutoRefresh, Filter(dynamic), Sort, Virtualise
+ // ════════════════════════════════════════════════════════════
+
+ var reverseIdCounter = idReverse;
+
+ TrackIntoCache(
+ sourceB.Connect()
+ .AutoRefresh(m => m.Rating) // AutoRefresh [2]
+ .Filter(m => m.Id >= idB && m.Id < idB + sourceBCount // Filter(dynamic) [2]
+ && m.Rating >= ratingThreshold)
+ .Sort(RatingDescComparer.Instance) // Sort [2]
+ .Virtualise(virtualRequests) // Virtualise [1]
+ .Transform(m => new StressMarket( // Transform [2]
+ Interlocked.Increment(ref reverseIdCounter),
+ $"R-{m.Name}", m.Region, m.Priority, m.Rating))
+ .TakeUntil(stopSignal), // AutoRefresh doesn't forward OnCompleted
+ reverseTarget);
+
+ subs.Add(reverseTarget.Connect().PopulateInto(sourceA));
+
+ // Side chains
+ using var sortVirtResults = sourceB.Connect()
+ .SortAndVirtualize(comparerSubject, virtualRequests) // SortAndVirtualize [1]
+ .AsAggregator();
+
+ IQuery? lastQuery = null;
+ var qwcTcs = new TaskCompletionSource();
+ subs.Add(sourceB.Connect()
+ .QueryWhenChanged() // QueryWhenChanged [1]
+ .Subscribe(q => lastQuery = q, ex => qwcTcs.TrySetException(ex), () => qwcTcs.TrySetResult()));
+ completionTasks.Add(qwcTcs.Task);
+ completionNames.Add("QueryWhenChanged-B");
+
+ // ════════════════════════════════════════════════════════════
+ // FLOW 3 — Joins: sourceA × sourceB
+ // Operators: FullJoin, InnerJoin, LeftJoin, RightJoin
+ // ════════════════════════════════════════════════════════════
+
+ var fullJoinCache = TrackCache(
+ sourceA.Connect().FullJoin( // FullJoin [1]
+ sourceB.Connect(), r => r.Id,
+ (key, left, right) =>
+ {
+ var src = left.HasValue ? left.Value : right.Value;
+ return new StressMarket(key, $"FJ-{src.Name}", src.Region, src.Priority, src.Rating);
+ }));
+
+ var innerJoinCache = TrackCache(
+ sourceA.Connect().InnerJoin( // InnerJoin [1]
+ sourceB.Connect(), r => r.Id,
+ (key, left, right) =>
+ new StressMarket(key.leftKey, $"IJ-{left.Name}", left.Region, left.Priority, right.Rating))
+ .ChangeKey(m => m.Id));
+
+ var leftJoinCache = TrackCache(
+ sourceA.Connect().LeftJoin( // LeftJoin [1]
+ sourceB.Connect(), r => r.Id,
+ (key, left, right) =>
+ new StressMarket(key, $"LJ-{left.Name}", left.Region, left.Priority,
+ right.HasValue ? right.Value.Rating : left.Rating)));
+
+ var rightJoinCache = TrackCache(
+ sourceA.Connect().RightJoin( // RightJoin [1]
+ sourceB.Connect(), r => r.Id,
+ (key, left, right) =>
+ new StressMarket(key, $"RJ-{right.Name}", right.Region, right.Priority,
+ left.HasValue ? left.Value.Rating : right.Rating)));
+
+ // ════════════════════════════════════════════════════════════
+ // FLOW 4 — Combiners + second join uses
+ // Operators: Or, And, Except, MergeChangeSets, FullJoin[2], InnerJoin[2]
+ // ════════════════════════════════════════════════════════════
+
+ var orCache = TrackCache(
+ sourceA.Connect().Or(sourceB.Connect()) // Or [1]
+ .TakeUntil(stopSignal));
+
+ var andCache = TrackCache(
+ sourceA.Connect().And(sourceB.Connect()) // And [1]
+ .TakeUntil(stopSignal));
+
+ var exceptCache = TrackCache(
+ sourceA.Connect().Except(sourceB.Connect()) // Except [1]
+ .TakeUntil(stopSignal));
+
+ var mergedCache = TrackCache(
+ new[] { sourceA.Connect(), sourceB.Connect() }
+ .MergeChangeSets()); // MergeChangeSets [1]
+
+ // Second join uses: join the join outputs together
+ var joinedJoinsCache = TrackCache(
+ fullJoinCache.Connect().FullJoin( // FullJoin [2]
+ rightJoinCache.Connect(), r => r.Id,
+ (key, left, right) =>
+ {
+ var src = left.HasValue ? left.Value : right.Value;
+ return new StressMarket(key, $"JJ-{src.Name}", src.Region, src.Priority, src.Rating);
+ }));
+
+ // Second InnerJoin on the overlapping subset
+ var innerJoin2Cache = TrackCache(
+ leftJoinCache.Connect().InnerJoin( // InnerJoin [2]
+ rightJoinCache.Connect(), r => r.Id,
+ (key, left, right) =>
+ new StressMarket(key.leftKey, $"IJ2-{left.Name}", left.Region, left.Priority, right.Rating))
+ .ChangeKey(m => m.Id));
+
+ // ════════════════════════════════════════════════════════════
+ // FLOW 5 — Groups: sourceA → grouped → flattened
+ // Operators: GroupOn, GroupOnImmutable, GroupOnObservable, MergeMany
+ // ════════════════════════════════════════════════════════════
+
+ var groupCache = TrackCache(
+ sourceA.Connect()
+ .Group(m => m.Region) // GroupOn [1]
+ .MergeMany(group => group.Cache.Connect())); // MergeMany [1]
+
+ using var immGroupAgg = sourceA.Connect()
+ .GroupWithImmutableState(m => m.Region) // GroupOnImmutable [1]
+ .AsAggregator();
+
+ var dynGroupCache = TrackCache(
+ sourceA.Connect()
+ .GroupOnObservable(m => m.WhenPropertyChanged(x => x.Region) // GroupOnObservable [1]
+ .Select(pv => pv.Value ?? regions[0]))
+ .MergeMany(group => group.Cache.Connect())
+ .TakeUntil(stopSignal)); // WhenPropertyChanged children don't complete
+
+ // Second GroupOn use on sourceB
+ var groupBCache = TrackCache(
+ sourceB.Connect()
+ .Group(m => m.Region) // GroupOn [2]
+ .MergeMany(group => group.Cache.Connect())); // MergeMany [3]
+
+ // ════════════════════════════════════════════════════════════
+ // FLOW 6 — MergeManyChangeSets (both overloads)
+ // ════════════════════════════════════════════════════════════
+
+ // 6a: Child comparer — highest price wins across markets
+ var childPriceCache = TrackCache(
+ sourceA.Connect()
+ .MergeManyChangeSets(m => m.LatestPrices, // MergeManyCS(child) [1]
+ PriceDescComparer.Instance));
+
+ // 6b: Source comparer + child comparer — priority then price
+ var sourcePriceCache = TrackCache(
+ sourceB.Connect()
+ .MergeManyChangeSets(m => m.LatestPrices, // MergeManyCS(source) [1]
+ PriorityAscComparer.Instance, PriceDescComparer.Instance));
+
+ // Second uses: reversed sources
+ var childPriceBCache = TrackCache(
+ sourceB.Connect()
+ .MergeManyChangeSets(m => m.LatestPrices, // MergeManyCS(child) [2]
+ PriceDescComparer.Instance));
+
+ var sourcePriceACache = TrackCache(
+ sourceA.Connect()
+ .MergeManyChangeSets(m => m.LatestPrices, // MergeManyCS(source) [2]
+ PriorityAscComparer.Instance, PriceDescComparer.Instance));
+
+ // ════════════════════════════════════════════════════════════
+ // FLOW 7 — Sort Variants, Switch, second BatchIf/Page/Virtualise
+ // ════════════════════════════════════════════════════════════
+
+ var boundListA = new List();
+ subs.Add(sourceA.Connect()
+ .SortAndBind(boundListA, RatingDescComparer.Instance) // SortAndBind [1]
+ .Subscribe());
+
+ var boundListB = new List();
+ subs.Add(sourceA.Connect()
+ .SortAndBind(boundListB, comparerSubject) // SortAndBind [2]
+ .Subscribe());
+
+ var switchCache = TrackCache(
+ switchSource.Switch() // Switch [1]
+ .TakeUntil(stopSignal));
+
+ // Second Page + Virtualise + BatchIf uses on sourceB
+ using var pageBSubject = new BehaviorSubject(new PageRequest(1, pageSize));
+ using var pauseB = new BehaviorSubject(false);
+
+ var pageBCache = TrackCache(
+ sourceB.Connect()
+ .Sort(PriorityAscComparer.Instance) // Sort [3]
+ .Page(pageBSubject) // Page [2]
+ .BatchIf(pauseB, false, (TimeSpan?)null) // BatchIf [2]
+ .TakeUntil(stopSignal));
+
+ using var virtBRequests = new BehaviorSubject(new VirtualRequest(0, virtualSize));
+ var virtBCache = TrackCache(
+ sourceB.Connect()
+ .Sort(PriorityAscComparer.Instance) // Sort [4]
+ .Virtualise(virtBRequests)); // Virtualise [2]
+
+ // ════════════════════════════════════════════════════════════
+ // FLOW 8 — TransformMany, TransformToTree, second OnItemRemoved
+ // ════════════════════════════════════════════════════════════
+
+ var allPricesACache = TrackCache(
+ sourceA.Connect()
+ .TransformMany(m => (IObservableCache)m.Prices, // TransformMany [1]
+ p => p.Id));
+
+ var allPricesBCache = TrackCache(
+ sourceB.Connect()
+ .TransformMany(m => (IObservableCache)m.Prices, // TransformMany [2]
+ p => p.Id));
+
+ // TransformToTree doesn't forward OnCompleted (library gap) — needs TakeUntil
+ var treeCache = TrackCache(
+ treeSource.Connect()
+ .TransformToTree(m => m.ParentId ?? 0) // TransformToTree [1]
+ .TakeUntil(stopSignal));
+
+ // Second OnItemRemoved + DisposeMany on sourceB
+ var reverseRemovals = 0;
+ subs.Add(sourceB.Connect()
+ .OnItemRemoved(m => Interlocked.Increment(ref reverseRemovals)) // OnItemRemoved [2]
+ .DisposeMany() // DisposeMany [2]
+ .Subscribe());
+
+ // Operators not covered here (AsyncDisposeMany, TransformAsync, TransformOnObservable,
+ // TransformManyAsync, SortAndPage, MergeManyListChangeSets) are exercised
+ // in their dedicated fixture tests under concurrent load.
+
+ // Second SortAndVirtualize on sourceA
+ using var sortVirtAResults = sourceA.Connect()
+ .SortAndVirtualize(comparerSubject, virtualRequests) // SortAndVirtualize [2]
+ .AsAggregator();
+
+ // Second QueryWhenChanged on sourceA
+ IQuery? lastQueryA = null;
+ var qwcATcs = new TaskCompletionSource();
+ subs.Add(sourceA.Connect()
+ .QueryWhenChanged() // QueryWhenChanged [2]
+ .Subscribe(q => lastQueryA = q, ex => qwcATcs.TrySetException(ex), () => qwcATcs.TrySetResult()));
+ completionTasks.Add(qwcATcs.Task);
+ completionNames.Add("QueryWhenChanged-A");
+
+ // Second Switch + GroupOnImmutable + GroupOnObservable
+ using var switchSource2 = new BehaviorSubject>>(sourceB.Connect());
+ var switchCache2 = TrackCache(
+ switchSource2.Switch() // Switch [2]
+ .TakeUntil(stopSignal));
+
+ using var immGroupBAgg = sourceB.Connect()
+ .GroupWithImmutableState(m => m.Region) // GroupOnImmutable [2]
+ .AsAggregator();
+
+ var dynGroupBCache = TrackCache(
+ sourceB.Connect()
+ .GroupOnObservable(m => m.WhenPropertyChanged(x => x.Region) // GroupOnObservable [2]
+ .Select(pv => pv.Value ?? regions[0]))
+ .MergeMany(group => group.Cache.Connect())
+ .TakeUntil(stopSignal)); // WhenPropertyChanged children don't complete
+
+ // Second LeftJoin + RightJoin + Or + And + Except + MergeChangeSets
+ var leftJoin2Cache = TrackCache(
+ sourceB.Connect().LeftJoin( // LeftJoin [2]
+ sourceA.Connect(), r => r.Id,
+ (key, left, right) =>
+ new StressMarket(key, $"LJ2-{left.Name}", left.Region, left.Priority,
+ right.HasValue ? right.Value.Rating : left.Rating)));
+
+ var rightJoin2Cache = TrackCache(
+ sourceB.Connect().RightJoin( // RightJoin [2]
+ sourceA.Connect(), r => r.Id,
+ (key, left, right) =>
+ new StressMarket(key, $"RJ2-{right.Name}", right.Region, right.Priority,
+ left.HasValue ? left.Value.Rating : right.Rating)));
+
+ var orCache2 = TrackCache(
+ sourceB.Connect().Or(sourceA.Connect()) // Or [2]
+ .TakeUntil(stopSignal));
+
+ var andCache2 = TrackCache(
+ sourceB.Connect().And(sourceA.Connect()) // And [2]
+ .TakeUntil(stopSignal));
+
+ var exceptCache2 = TrackCache(
+ sourceB.Connect().Except(sourceA.Connect()) // Except [2]
+ .TakeUntil(stopSignal));
+
+ var mergedCache2 = TrackCache(
+ new[] { sourceB.Connect(), sourceA.Connect() }
+ .MergeChangeSets()); // MergeChangeSets [2]
+
+ // Second TransformToTree using a different subset
+ var treeCache2 = TrackCache(
+ treeSource.Connect()
+ .Filter(m => m.ParentId.HasValue || m.Id < idTree + treeCount / 2)
+ .TransformToTree(m => m.ParentId ?? 0) // TransformToTree [2]
+ .TakeUntil(stopSignal));
+
+ // ════════════════════════════════════════════════════════════
+ // Multi-Threaded Writers
+ // ════════════════════════════════════════════════════════════
+
+ var barrier = new Barrier(writerThreads * 2 + 1);
+ var slicesA = PartitionList(marketsA, writerThreads);
+ var slicesB = PartitionList(marketsB, writerThreads);
+ var writerTasks = new List();
+
+ for (var t = 0; t < writerThreads; t++)
+ {
+ var slice = slicesA[t];
+ var tRand = new Randomizer(Seed + t + 1);
+ writerTasks.Add(Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ foreach (var m in slice) sourceA.AddOrUpdate(m);
+ for (var i = 0; i < ratingMutations; i++)
+ slice[tRand.Number(0, slice.Count - 1)].Rating = tRand.Double(RatingMin, RatingMax);
+ for (var i = 0; i < regionMutations; i++)
+ slice[tRand.Number(0, slice.Count - 1)].Region = regions[tRand.Number(0, regionCount - 1)];
+ barrier.SignalAndWait();
+ }));
+ }
+
+ for (var t = 0; t < writerThreads; t++)
+ {
+ var slice = slicesB[t];
+ var tRand = new Randomizer(Seed + writerThreads + t + 1);
+ writerTasks.Add(Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ foreach (var m in slice) sourceB.AddOrUpdate(m);
+ for (var i = 0; i < ratingMutations; i++)
+ slice[tRand.Number(0, slice.Count - 1)].Rating = tRand.Double(RatingMin, RatingMax);
+ barrier.SignalAndWait();
+ }));
+ }
+
+ // ── Start writers ───────────────────────────────────────────
+ barrier.SignalAndWait();
+ pauseBatch.OnNext(true);
+ barrier.SignalAndWait();
+ await Task.WhenAll(writerTasks);
+ pauseBatch.OnNext(false);
+
+ // Post-write operations
+ sourceA.AddOrUpdate(overlapping);
+ sourceB.AddOrUpdate(overlapping);
+ treeSource.AddOrUpdate(treeMarkets);
+ forceTransform.OnNext(m => m.Rating > ratingThreshold);
+ switchSource.OnNext(sourceB.Connect());
+ switchSource2.OnNext(sourceA.Connect());
+ comparerSubject.OnNext(PriorityAscComparer.Instance);
+
+ // ── Teardown ────────────────────────────────────────────────
+ // 1. Signal stop for operators with library gaps (don't forward OnCompleted):
+ // Static Combiner (Or/And/Except), BatchIf, TransformToTree, Switch
+ stopSignal.OnNext(Unit.Default);
+ stopSignal.OnCompleted();
+
+ // ── Snapshot final state (bidirectional flows are frozen by stopSignal) ──
+ var finalAKeys = new HashSet(sourceA.Keys);
+ var finalBKeys = new HashSet(sourceB.Keys);
+
+ // 2. Complete all BehaviorSubjects so multi-source operators can complete
+ forceTransform.OnCompleted();
+ pageRequests.OnCompleted();
+ pageBSubject.OnCompleted();
+ virtualRequests.OnCompleted();
+ virtBRequests.OnCompleted();
+ comparerSubject.OnCompleted();
+ pauseBatch.OnCompleted();
+ pauseB.OnCompleted();
+ switchSource.OnCompleted();
+ switchSource2.OnCompleted();
+
+ // 2. Dispose source caches — fires OnCompleted on Connect() streams,
+ // DisposeMany auto-disposes inner price caches (completing MMCS/TransformMany)
+ sourceA.Dispose();
+ sourceB.Dispose();
+ treeSource.Dispose();
+
+ // 3. Dispose subscriptions — disconnects Publish, firing OnCompleted on
+ // all published streams (completes operators like AutoRefresh, TreeBuilder
+ // that don't propagate OnCompleted naturally)
+ subs.Dispose();
+
+ // 4. Wait for all completion tasks with timeout (deadlock detector)
+ var allCompleted = Task.WhenAll(completionTasks);
+ var timeout = Task.Delay(TimeSpan.FromSeconds(timeoutSeconds));
+ var finished = await Task.WhenAny(allCompleted, timeout);
+ if (!ReferenceEquals(finished, allCompleted))
+ {
+ var pending = completionTasks.Select((t2, i) => new { Index = i, t2.Status, Name = completionNames[i] })
+ .Where(x => x.Status != TaskStatus.RanToCompletion)
+ .Select(x => $"[{x.Index}] {x.Name} ({x.Status})").ToList();
+ pending.Should().BeEmpty($"all {completionTasks.Count} tasks should finish within {timeoutSeconds}s. Pending: {string.Join(", ", pending)}");
+ }
+
+ // ════════════════════════════════════════════════════════════
+ // Verification — exact contents with BeEquivalentTo
+ // ════════════════════════════════════════════════════════════
+
+ // Flow 1: Forward — filtered, transformed, paged subset of sourceA
+ forwardTarget.Count.Should().BeGreaterThan(0, "Flow1 should produce results");
+ forwardTarget.Count.Should().BeLessThanOrEqualTo(pageSize, "Page should limit");
+ forwardTarget.Items.Should().OnlyContain(m => m.Name.StartsWith("F-"), "Transform prefixes 'F-'");
+ forwardTarget.Items.Should().OnlyContain(
+ m => m.Rating >= ratingThreshold * transformMultiplier,
+ "Transform multiplies rating of items that passed filter");
+ forwardRemovals.Should().BeGreaterThan(0, "OnItemRemoved fires on rating mutation exits");
+
+ // Flow 2: Reverse — filtered, sorted, virtualized, transformed subset of sourceB
+ reverseTarget.Count.Should().BeGreaterThan(0, "Flow2 should produce results");
+ reverseTarget.Count.Should().BeLessThanOrEqualTo(virtualSize, "Virtualise limits");
+ reverseTarget.Items.Should().OnlyContain(m => m.Name.StartsWith("R-"), "Transform prefixes 'R-'");
+
+ // Flow 3: Joins — verify mathematical relationships hold
+ // Each cache may see a slightly different snapshot due to bidirectional flow timing,
+ // but the set-theoretic relationships must hold within each cache's own view.
+ fullJoinCache.Items.Should().OnlyContain(m => m.Name.StartsWith("FJ-"), "FullJoin prefixes 'FJ-'");
+ innerJoinCache.Items.Should().OnlyContain(m => m.Name.StartsWith("IJ-"), "InnerJoin prefixes 'IJ-'");
+ leftJoinCache.Items.Should().OnlyContain(m => m.Name.StartsWith("LJ-"), "LeftJoin prefixes 'LJ-'");
+ rightJoinCache.Items.Should().OnlyContain(m => m.Name.StartsWith("RJ-"), "RightJoin prefixes 'RJ-'");
+
+ // InnerJoin keys ⊂ FullJoin keys (intersection ⊂ union)
+ new HashSet(innerJoinCache.Keys).IsSubsetOf(new HashSet(fullJoinCache.Keys)).Should()
+ .BeTrue("InnerJoin ⊂ FullJoin");
+ // InnerJoin must have at least the overlapping keys
+ innerJoinCache.Count.Should().BeGreaterThanOrEqualTo(overlappingCount,
+ "InnerJoin finds at least overlapping items");
+
+ // Flow 4: Combiners — Or and Merged share the same Publish, so they're identical
+ orCache.Keys.Should().BeEquivalentTo(mergedCache.Keys, "Or = Merged (same Publish sources)");
+ // And ⊂ Or
+ new HashSet(andCache.Keys).IsSubsetOf(new HashSet(orCache.Keys)).Should()
+ .BeTrue("And ⊂ Or");
+ // Except ∩ And = ∅
+ new HashSet(exceptCache.Keys).Overlaps(andCache.Keys).Should()
+ .BeFalse("Except ∩ And = ∅");
+ // Except ∪ And ∪ (items only in B) = Or
+ var exceptPlusAnd = new HashSet(exceptCache.Keys);
+ exceptPlusAnd.UnionWith(andCache.Keys);
+ exceptPlusAnd.IsSubsetOf(new HashSet(orCache.Keys)).Should()
+ .BeTrue("Except ∪ And ⊂ Or");
+
+ // Second joins — cross-verify with first joins (same sources, same completion)
+ leftJoin2Cache.Keys.Should().BeEquivalentTo(rightJoinCache.Keys, "LeftJoin2(B×A) = RightJoin(A×B)");
+ leftJoin2Cache.Items.Should().OnlyContain(m => m.Name.StartsWith("LJ2-"), "LeftJoin2 prefixes");
+ rightJoin2Cache.Keys.Should().BeEquivalentTo(leftJoinCache.Keys, "RightJoin2(B×A) = LeftJoin(A×B)");
+ rightJoin2Cache.Items.Should().OnlyContain(m => m.Name.StartsWith("RJ2-"), "RightJoin2 prefixes");
+ orCache2.Keys.Should().BeEquivalentTo(orCache.Keys, "Or2 = Or (same sources, same completion)");
+ andCache2.Keys.Should().BeEquivalentTo(andCache.Keys, "And2 = And (same sources)");
+ mergedCache2.Keys.Should().BeEquivalentTo(mergedCache.Keys, "MergedCache2 = Merged (same sources)");
+
+ // Flow 5: Groups — verify grouping preserves all items from same snapshot
+ groupCache.Items.Select(m => m.Region).Distinct().Count().Should()
+ .BeGreaterThan(1, "GroupOn creates multiple regions");
+ immGroupAgg.Data.Count.Should().BeGreaterThan(1, "GroupOnImmutable produces groups");
+ immGroupBAgg.Data.Count.Should().BeGreaterThan(1, "GroupOnImmutable(B) produces groups");
+
+ // Flow 6: MergeManyChangeSets — exact price key verification
+ // MMCS(child/A) and MMCS(source/A) see the same sourceA markets, same price keys
+ childPriceCache.Keys.Should().BeEquivalentTo(sourcePriceACache.Keys,
+ "MMCS(child/A) = MMCS(source/A) — same source markets, same price keys");
+ childPriceBCache.Keys.Should().BeEquivalentTo(sourcePriceCache.Keys,
+ "MMCS(child/B) = MMCS(source/B) — same source markets, same price keys");
+
+ // Flow 7: SortAndBind — exact count matching sourceA
+ boundListA.Count.Should().Be(leftJoinCache.Count, "SortAndBind = LeftJoin count (both see all sourceA)");
+ boundListB.Count.Should().Be(leftJoinCache.Count, "SortAndBind(obs) = LeftJoin count");
+ for (var i = 1; i < boundListB.Count; i++)
+ boundListB[i - 1].Priority.Should().BeLessThanOrEqualTo(boundListB[i].Priority,
+ "SortAndBind(obs) re-sorted by priority after comparer switch");
+
+ // Switch: after switching, should have items from the switched-to source
+ switchCache.Count.Should().BeGreaterThan(0, "Switch (switched to B) has items");
+ switchCache2.Count.Should().BeGreaterThan(0, "Switch2 (switched to A) has items");
+
+ pageBCache.Count.Should().BeGreaterThan(0, "Page(B) produces results");
+ pageBCache.Count.Should().BeLessThanOrEqualTo(pageSize, "Page(B) respects page limit");
+ virtBCache.Count.Should().BeGreaterThan(0, "Virtualise(B) produces results");
+ virtBCache.Count.Should().BeLessThanOrEqualTo(virtualSize, "Virtualise(B) respects virtual limit");
+
+ // Flow 8: TransformMany — exact price key sets from original markets
+ var expectedPriceKeysA = new HashSet(marketsA.SelectMany(m => m.Prices.Keys));
+ new HashSet(allPricesACache.Keys).IsSupersetOf(expectedPriceKeysA).Should()
+ .BeTrue("TransformMany(A) contains all original sourceA prices");
+ var expectedPriceKeysB = new HashSet(marketsB.SelectMany(m => m.Prices.Keys));
+ new HashSet(allPricesBCache.Keys).IsSupersetOf(expectedPriceKeysB).Should()
+ .BeTrue("TransformMany(B) contains all original sourceB prices");
+
+ // TransformToTree
+ static int CountAll(IEnumerable> nodes)
+ {
+ var c = 0;
+ foreach (var n in nodes) { c++; c += CountAll(n.Children.Items); }
+ return c;
+ }
+
+ CountAll(treeCache.Items).Should().Be(treeCount, "Tree has all markets across depths");
+ treeCache.Items.Any(n => n.Children.Count > 0).Should().BeTrue("Tree has child nodes");
+ treeCache2.Count.Should().BeGreaterThan(0, "Tree2 produces results");
+
+
+
+ // Side chains
+ lastQuery.Should().NotBeNull("QueryWhenChanged(B) fired");
+ lastQueryA.Should().NotBeNull("QueryWhenChanged(A) fired");
+ sortVirtResults.Data.Count.Should().BeLessThanOrEqualTo(virtualSize, "SortAndVirtualize respects limit");
+ sortVirtAResults.Data.Count.Should().BeLessThanOrEqualTo(virtualSize, "SortAndVirtualize(A) respects limit");
+ }
+
+ // ════════════════════════════════════════════════════════════════
+ // Data Generation
+ // ════════════════════════════════════════════════════════════════
+
+ private static List GenerateMarkets(Randomizer rand, int idStart, int count, string[] regions)
+ {
+ var markets = new List(count);
+ for (var i = 0; i < count; i++)
+ {
+ var id = idStart + i;
+ var market = new StressMarket(
+ id, $"Market-{id}",
+ regions[rand.Number(0, regions.Length - 1)],
+ rand.Number(PriorityMin, PriorityMax),
+ rand.Double(RatingMin, RatingMax));
+
+ var priceCount = rand.Number(PricesPerMarketMin, PricesPerMarketMax);
+ market.Prices.Edit(u =>
+ {
+ for (var p = 0; p < priceCount; p++)
+ u.AddOrUpdate(new StressPrice(id * 1000 + p, id, rand.Decimal(PriceMin, PriceMax)));
+ });
+
+ markets.Add(market);
+ }
+
+ return markets;
+ }
+
+ private static List GenerateTreeMarkets(Randomizer rand, int idStart, int count, string[] regions)
+ {
+ var markets = new List(count);
+ var rootCount = Math.Max(2, count / 3);
+
+ for (var i = 0; i < rootCount; i++)
+ markets.Add(new StressMarket(idStart + i, $"Tree-Root-{i}",
+ regions[rand.Number(0, regions.Length - 1)],
+ rand.Number(PriorityMin, PriorityMax),
+ rand.Double(RatingMin, RatingMax)));
+
+ for (var i = rootCount; i < count; i++)
+ {
+ var parentIdx = rand.Number(0, i - 1);
+ markets.Add(new StressMarket(idStart + i, $"Tree-Child-{i}",
+ regions[rand.Number(0, regions.Length - 1)],
+ rand.Number(PriorityMin, PriorityMax),
+ rand.Double(RatingMin, RatingMax),
+ markets[parentIdx].Id));
+ }
+
+ return markets;
+ }
+
+ private static List> PartitionList(List source, int partitions)
+ {
+ var result = Enumerable.Range(0, partitions).Select(_ => new List()).ToList();
+ for (var i = 0; i < source.Count; i++)
+ result[i % partitions].Add(source[i]);
+ return result;
+ }
+}
diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs
index 33db06f0..ce6cc89f 100644
--- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs
+++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs
@@ -5,6 +5,7 @@
using System.Reactive.Disposables;
using System.Reactive;
using System.Reactive.Linq;
+using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
@@ -90,9 +91,11 @@ IObservable AddRemovePrices(Market market, int priceCount, int para
.Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler))
.Finally(market.PricesCache.Dispose);
- var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true);
+ var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true).Publish();
var adding = true;
+ var cacheCompleted = merged.LastOrDefaultAsync().ToTask();
using var priceResults = merged.AsAggregator();
+ using var connect = merged.Connect();
// Start asynchrononously modifying the parent list and the child lists
using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
@@ -118,6 +121,9 @@ IObservable AddRemovePrices(Market market, int priceCount, int para
}
while (adding);
+ // Wait for the source cache to finish delivering all notifications.
+ await cacheCompleted;
+
// Verify the results
CheckResultContents(_marketCacheResults, priceResults, Market.RatingCompare);
}
diff --git a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs
index a1380137..ca46b77f 100644
--- a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs
+++ b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs
@@ -1,6 +1,9 @@
using System;
+using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
+using System.Threading;
+using System.Threading.Tasks;
using DynamicData.Tests.Domain;
@@ -188,4 +191,168 @@ public void StaticFilterRemove()
public record class SomeObject(int Id, int Value);
+
+ [Fact]
+ public async Task MultiCacheFanInDoesNotDeadlock()
+ {
+ const int itemCount = 100;
+
+ using var cacheA = new SourceCache(static x => x.Key);
+ using var cacheB = new SourceCache(static x => x.Key);
+ using var destination = new SourceCache(static x => x.Key);
+ using var subA = cacheA.Connect().PopulateInto(destination);
+ using var subB = cacheB.Connect().PopulateInto(destination);
+ using var results = destination.Connect().AsAggregator();
+
+ var taskA = Task.Run(() =>
+ {
+ for (var i = 0; i < itemCount; i++)
+ {
+ cacheA.AddOrUpdate(new TestItem($"a-{i}", $"ValueA-{i}"));
+ }
+ });
+
+ var taskB = Task.Run(() =>
+ {
+ for (var i = 0; i < itemCount; i++)
+ {
+ cacheB.AddOrUpdate(new TestItem($"b-{i}", $"ValueB-{i}"));
+ }
+ });
+
+ var completed = Task.WhenAll(taskA, taskB);
+ var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(10)));
+
+ finished.Should().BeSameAs(completed, "concurrent edits with cross-cache subscribers should not deadlock");
+ results.Error.Should().BeNull();
+ results.Data.Count.Should().Be(itemCount * 2, "all items from both caches should arrive in the destination");
+ results.Data.Items.Should().BeEquivalentTo([.. cacheA.Items, .. cacheB.Items], "all items should be in the destination");
+ }
+
+ [Fact]
+ public async Task DirectCrossWriteDoesNotDeadlock()
+ {
+ const int iterations = 100;
+
+ for (var iter = 0; iter < iterations; iter++)
+ {
+ using var cacheA = new SourceCache(static x => x.Key);
+ using var cacheB = new SourceCache(static x => x.Key);
+
+ // Bidirectional: A items flow into B, B items flow into A.
+ // Filter by prefix prevents infinite feedback.
+ using var aToB = cacheA.Connect()
+ .Filter(static x => x.Key.StartsWith('a'))
+ .Transform(static (item, _) => new TestItem("from-a-" + item.Key, item.Value))
+ .PopulateInto(cacheB);
+
+ using var bToA = cacheB.Connect()
+ .Filter(static x => x.Key.StartsWith('b'))
+ .Transform(static (item, _) => new TestItem("from-b-" + item.Key, item.Value))
+ .PopulateInto(cacheA);
+
+ using var barrier = new Barrier(2);
+
+ var taskA = Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ for (var i = 0; i < 1000; i++)
+ {
+ cacheA.AddOrUpdate(new TestItem("a" + i, "V" + i));
+ }
+ });
+
+ var taskB = Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ for (var i = 0; i < 1000; i++)
+ {
+ cacheB.AddOrUpdate(new TestItem("b" + i, "V" + i));
+ }
+ });
+
+ var completed = Task.WhenAll(taskA, taskB);
+ var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(30)));
+
+ finished.Should().BeSameAs(completed, $"iteration {iter}: bidirectional cross-cache writes should not deadlock");
+ }
+ }
+
+ [Fact]
+ public void ConnectDuringDeliveryDoesNotDuplicate()
+ {
+ // Exploits the dequeue-to-OnNext window. Thread A writes two items in
+ // separate batches. The first delivery is held by a slow subscriber.
+ // While item1 delivery is blocked, item2 is committed to ReaderWriter
+ // and sitting in the queue. Thread B calls Connect(), takes a snapshot
+ // (sees both items), subscribes to _changes, then item2 is delivered
+ // via OnNext — producing a duplicate if not guarded by a generation counter.
+ using var cache = new SourceCache(static x => x.Key);
+
+ using var delivering = new ManualResetEventSlim(false);
+ using var item2Written = new ManualResetEventSlim(false);
+ using var connectDone = new ManualResetEventSlim(false);
+
+ var firstDelivery = true;
+
+ // First subscriber: blocks on the first delivery to create the window
+ using var slowSub = cache.Connect().Subscribe(_ =>
+ {
+ if (firstDelivery)
+ {
+ firstDelivery = false;
+ delivering.Set();
+
+ // Wait until item2 has been written and the Connect has subscribed
+ connectDone.Wait(TimeSpan.FromSeconds(5));
+ }
+ });
+
+ // Write item1 on a background thread — delivery starts, slow subscriber blocks
+ var writeTask = Task.Run(() =>
+ {
+ cache.AddOrUpdate(new TestItem("k1", "v1"));
+ });
+
+ // Wait for delivery of item1 to be in progress (slow sub is blocking)
+ delivering.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("delivery should have started");
+
+ // Now write item2 on another thread. It will acquire the lock, commit to
+ // ReaderWriter, enqueue a notification, and return. The notification sits
+ // in the queue because the deliverer (Thread A) is blocked by the slow sub.
+ var writeTask2 = Task.Run(() =>
+ {
+ cache.AddOrUpdate(new TestItem("k2", "v2"));
+ item2Written.Set();
+ });
+ item2Written.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("item2 should have been written");
+
+ // Now Connect on the main thread. The snapshot from ReaderWriter includes
+ // BOTH k1 and k2. The subscription to _changes is added. When the slow
+ // subscriber unblocks, item2's notification will be delivered via OnNext
+ // and the new subscriber will see k2 again — a duplicate Add.
+ var addCounts = new Dictionary();
+ using var newSub = cache.Connect().Subscribe(changes =>
+ {
+ foreach (var c in changes)
+ {
+ if (c.Reason == ChangeReason.Add)
+ {
+ var key = c.Current.Key;
+ addCounts[key] = addCounts.GetValueOrDefault(key) + 1;
+ }
+ }
+ });
+
+ // Unblock the slow subscriber — delivery resumes, item2 delivered
+ connectDone.Set();
+ writeTask.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("writeTask should complete");
+ writeTask2.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("writeTask2 should complete");
+
+ // Each key should appear exactly once in the new subscriber's view
+ addCounts.GetValueOrDefault("k1").Should().Be(1, "k1 should appear once (snapshot only)");
+ addCounts.GetValueOrDefault("k2").Should().Be(1, "k2 should appear once, not duplicated from snapshot + queued delivery");
+ }
+
+ private sealed record TestItem(string Key, string Value);
}
diff --git a/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs b/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs
index e73de850..db39604a 100644
--- a/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs
+++ b/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
+using System.Threading;
using System.Threading.Tasks;
using DynamicData.Kernel;
using FluentAssertions;
@@ -352,6 +353,234 @@ public async Task SuspensionsAreThreadSafe()
_results.Messages[0].Adds.Should().Be(100, "Should have 100 adds");
}
+ [Fact]
+ public void ResumeThenReSuspendDeliversFirstBatchOnly()
+ {
+ // Forces the ordering: resume completes before re-suspend.
+ // The deferred subscriber activates with the first batch snapshot,
+ // then re-suspend holds the second batch until final resume.
+ using var cache = new SourceCache(static x => x);
+ var dataSet1 = Enumerable.Range(0, 100).ToList();
+ var dataSet2 = Enumerable.Range(1000, 100).ToList();
+ var allData = dataSet1.Concat(dataSet2).ToList();
+
+ var suspend1 = cache.SuspendNotifications();
+ cache.AddOrUpdate(dataSet1);
+
+ using var results = cache.Connect().AsAggregator();
+ results.Messages.Count.Should().Be(0, "no messages during suspension");
+
+ // Resume first — subscriber activates
+ suspend1.Dispose();
+
+ results.Messages.Count.Should().Be(1, "exactly one message after resume");
+ results.Messages[0].Adds.Should().Be(dataSet1.Count, $"snapshot should have {dataSet1.Count} adds");
+ results.Messages[0].Removes.Should().Be(0, "no removes");
+ results.Messages[0].Updates.Should().Be(0, "no updates");
+ results.Messages[0].Select(x => x.Key).Should().Equal(dataSet1, "snapshot should contain first batch keys");
+
+ // Re-suspend, write second batch
+ var suspend2 = cache.SuspendNotifications();
+ cache.AddOrUpdate(dataSet2);
+
+ results.Messages.Count.Should().Be(1, "still one message — second batch held by suspension");
+ results.Summary.Overall.Adds.Should().Be(dataSet1.Count, $"still {dataSet1.Count} adds total");
+
+ // Final resume
+ suspend2.Dispose();
+
+ results.Messages.Count.Should().Be(2, "two messages total");
+ results.Messages[1].Adds.Should().Be(dataSet2.Count, $"second message has {dataSet2.Count} adds");
+ results.Messages[1].Removes.Should().Be(0, "no removes in second message");
+ results.Messages[1].Updates.Should().Be(0, "no updates in second message");
+ results.Messages[1].Select(x => x.Key).Should().Equal(dataSet2, "second message should contain second batch keys");
+
+ results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total");
+ results.Summary.Overall.Removes.Should().Be(0, "no removes");
+ results.Data.Count.Should().Be(allData.Count, $"{allData.Count} items in final state");
+ results.Error.Should().BeNull();
+ results.IsCompleted.Should().BeFalse();
+ }
+
+ [Fact]
+ public void ReSuspendThenResumeDeliversAllInSingleBatch()
+ {
+ // Forces the ordering: re-suspend before resume.
+ // Suspend count goes 1→2→1, no resume signal fires.
+ // Both batches accumulate and arrive as a single changeset on final resume.
+ using var cache = new SourceCache(static x => x);
+ var dataSet1 = Enumerable.Range(0, 100).ToList();
+ var dataSet2 = Enumerable.Range(1000, 100).ToList();
+ var allData = dataSet1.Concat(dataSet2).ToList();
+
+ var suspend1 = cache.SuspendNotifications();
+ cache.AddOrUpdate(dataSet1);
+
+ using var results = cache.Connect().AsAggregator();
+ results.Messages.Count.Should().Be(0, "no messages during suspension");
+
+ // Re-suspend first — count goes 1→2
+ var suspend2 = cache.SuspendNotifications();
+
+ // Resume first suspend — count goes 2→1, still suspended
+ suspend1.Dispose();
+
+ results.Messages.Count.Should().Be(0, "no messages — still suspended (count=1)");
+ results.Summary.Overall.Adds.Should().Be(0, "no adds — still suspended");
+
+ // Write second batch while still suspended
+ cache.AddOrUpdate(dataSet2);
+
+ results.Messages.Count.Should().Be(0, "still no messages");
+
+ // Final resume — count goes 1→0
+ suspend2.Dispose();
+
+ results.Messages.Count.Should().Be(1, "single message with all data");
+ results.Messages[0].Adds.Should().Be(allData.Count, $"all {allData.Count} items in one changeset");
+ results.Messages[0].Removes.Should().Be(0, "no removes");
+ results.Messages[0].Updates.Should().Be(0, "no updates");
+ results.Messages[0].Select(c => c.Key).OrderBy(k => k).Should().Equal(allData, "should contain both batches in order");
+
+ results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total");
+ results.Summary.Overall.Removes.Should().Be(0, "no removes");
+ results.Summary.Overall.Updates.Should().Be(0, "no updates");
+ results.Data.Count.Should().Be(allData.Count, $"{allData.Count} items in final state");
+ results.Error.Should().BeNull();
+ results.IsCompleted.Should().BeFalse();
+ }
+
+ [Fact]
+ public async Task ConcurrentSuspendDuringResumeDoesNotCorrupt()
+ {
+ // Stress test: races resume against re-suspend on two threads.
+ // Both orderings are correct (tested deterministically above).
+ // This test verifies no corruption, deadlocks, or data loss under contention.
+ const int iterations = 200;
+ var dataSet1 = Enumerable.Range(0, 100).ToList();
+ var dataSet2 = Enumerable.Range(1000, 100).ToList();
+ var allData = dataSet1.Concat(dataSet2).ToList();
+
+ for (var iter = 0; iter < iterations; iter++)
+ {
+ using var cache = new SourceCache(static x => x);
+
+ var suspend1 = cache.SuspendNotifications();
+ cache.AddOrUpdate(dataSet1);
+ using var results = cache.Connect().AsAggregator();
+
+ using var barrier = new Barrier(2);
+ var resumeTask = Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ suspend1.Dispose();
+ });
+
+ var reSuspendTask = Task.Run(() =>
+ {
+ barrier.SignalAndWait();
+ return cache.SuspendNotifications();
+ });
+
+ await Task.WhenAll(resumeTask, reSuspendTask);
+ var suspend2 = await reSuspendTask;
+
+ cache.AddOrUpdate(dataSet2);
+ suspend2.Dispose();
+
+ results.Summary.Overall.Adds.Should().Be(allData.Count, $"iteration {iter}: exactly {allData.Count} adds");
+ results.Summary.Overall.Removes.Should().Be(0, $"iteration {iter}: no removes");
+ results.Summary.Overall.Updates.Should().Be(0, $"iteration {iter}: no updates because keys don't overlap");
+ results.Data.Count.Should().Be(allData.Count, $"iteration {iter}: {allData.Count} items in final state");
+ results.Data.Keys.OrderBy(k => k).Should().Equal(allData, $"iteration {iter}: all keys present in order");
+ results.Error.Should().BeNull($"iteration {iter}: no errors");
+ results.IsCompleted.Should().BeFalse($"iteration {iter}: not completed");
+ }
+ }
+
+ [Fact]
+ public async Task ResumeSignalUnderLockPreventsStaleSnapshotFromReSuspend()
+ {
+ // Verifies that a deferred Connect subscriber never sees data written during
+ // a re-suspension. The resume signal fires under the lock (reentrant), so the
+ // deferred subscriber activates and takes its snapshot before any other thread
+ // can re-suspend or write new data.
+ //
+ // A slow first subscriber blocks delivery of accumulated changes, creating a
+ // window where the main thread re-suspends and writes a second batch. The
+ // deferred subscriber's snapshot must contain only the first batch.
+ using var cache = new SourceCache(static x => x);
+ var dataSet1 = Enumerable.Range(0, 100).ToList();
+ var dataSet2 = Enumerable.Range(1000, 100).ToList();
+ var allData = dataSet1.Concat(dataSet2).ToList();
+
+ using var delivering = new SemaphoreSlim(0, 1);
+ using var proceedWithResuspend = new SemaphoreSlim(0, 1);
+
+ var suspend1 = cache.SuspendNotifications();
+ cache.AddOrUpdate(dataSet1);
+
+ // First subscriber blocks on delivery to hold the delivery thread
+ var firstDelivery = true;
+ using var slowSub = cache.Connect().Subscribe(_ =>
+ {
+ if (firstDelivery)
+ {
+ firstDelivery = false;
+ delivering.Release();
+ proceedWithResuspend.Wait(TimeSpan.FromSeconds(5));
+ }
+ });
+
+ // Deferred subscriber — will activate when resume signal fires
+ using var results = cache.Connect().AsAggregator();
+ results.Messages.Count.Should().Be(0, "no messages during suspension");
+
+ // Resume on background thread — delivery blocks on slow subscriber
+ var resumeTask = Task.Run(() => suspend1.Dispose());
+ (await delivering.WaitAsync(TimeSpan.FromSeconds(5))).Should().BeTrue("delivery should have started");
+
+ // Re-suspend and write second batch while delivery is blocked
+ var suspend2 = cache.SuspendNotifications();
+ cache.AddOrUpdate(dataSet2);
+
+ // dataSet2 must not appear in any message received so far
+ foreach (var msg in results.Messages)
+ {
+ foreach (var change in msg)
+ {
+ change.Key.Should().BeInRange(0, 99,
+ "deferred subscriber should only have first-batch keys before second resume");
+ }
+ }
+
+ // Unblock delivery
+ proceedWithResuspend.Release();
+ await resumeTask;
+
+ // Only dataSet1 should have been delivered — dataSet2 is held by second suspension
+ results.Summary.Overall.Adds.Should().Be(dataSet1.Count,
+ $"exactly {dataSet1.Count} adds before second resume — dataSet2 must be held by suspension");
+ results.Messages.Should().HaveCount(1, "exactly one message (snapshot of dataSet1)");
+ results.Messages[0].Adds.Should().Be(dataSet1.Count);
+ results.Messages[0].Select(c => c.Key).Should().Equal(dataSet1,
+ "snapshot should contain exactly first-batch keys in order");
+
+ // Resume second suspension — dataSet2 arrives now
+ suspend2.Dispose();
+
+ results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total");
+ results.Summary.Overall.Removes.Should().Be(0, "no removes");
+ results.Messages.Should().HaveCount(2, "two messages: snapshot + second batch");
+ results.Messages[1].Adds.Should().Be(dataSet2.Count);
+ results.Messages[1].Select(c => c.Key).Should().Equal(dataSet2,
+ "second message should contain exactly second-batch keys in order");
+ results.Data.Count.Should().Be(allData.Count);
+ results.Data.Keys.OrderBy(k => k).Should().Equal(allData);
+ results.Error.Should().BeNull();
+ results.IsCompleted.Should().BeFalse();
+ }
+
public void Dispose()
{
_source.Dispose();
diff --git a/src/DynamicData.Tests/Internal/CacheParentSubscriptionFixture.cs b/src/DynamicData.Tests/Internal/CacheParentSubscriptionFixture.cs
new file mode 100644
index 00000000..d8d4273b
--- /dev/null
+++ b/src/DynamicData.Tests/Internal/CacheParentSubscriptionFixture.cs
@@ -0,0 +1,375 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Subjects;
+using System.Threading;
+using System.Threading.Tasks;
+
+using Bogus;
+
+using DynamicData.Internal;
+using DynamicData.Tests.Utilities;
+
+using FluentAssertions;
+
+using Xunit;
+
+namespace DynamicData.Tests.Internal;
+
+///
+/// Tests for
+/// behavioral contracts using a minimal concrete subclass.
+///
+public sealed class CacheParentSubscriptionFixture
+{
+ private const int SeedMin = 1;
+ private const int SeedMax = 10000;
+ private const int BatchSizeMin = 2;
+ private const int BatchSizeMax = 8;
+
+ private readonly Randomizer _rand = new(55);
+
+ /// Test item with a typed key — no string parsing.
+ private sealed record TestItem(int Key, string Value);
+
+ [Fact]
+ public void ParentOnNext_CalledForEachChangeSet()
+ {
+ var itemCount = _rand.Number(BatchSizeMin, BatchSizeMax);
+ using var source = new SourceCache(x => x.Key);
+ var observer = new TestObserver();
+ using var sub = new TestSubscription(observer);
+ sub.ExposeCreateParent(source.Connect());
+
+ var items = Enumerable.Range(0, itemCount)
+ .Select(i => new TestItem(_rand.Number(SeedMin, SeedMax) + i * 100, _rand.String2(_rand.Number(3, 10))))
+ .ToList();
+
+ foreach (var item in items)
+ source.AddOrUpdate(item);
+
+ sub.ParentCallCount.Should().Be(items.Count, "ParentOnNext should fire once per changeset");
+ observer.EmitCount.Should().Be(items.Count, "EmitChanges should fire after each parent update");
+ }
+
+ [Fact]
+ public void ChildOnNext_CalledForEachEmission()
+ {
+ using var source = new SourceCache(x => x.Key);
+ var childSubjects = new List>();
+ var observer = new TestObserver();
+ using var sub = new TestSubscription(observer, key =>
+ {
+ var subj = new Subject();
+ childSubjects.Add(subj);
+ return subj;
+ });
+ sub.ExposeCreateParent(source.Connect());
+
+ var key = _rand.Number(SeedMin, SeedMax);
+ source.AddOrUpdate(new TestItem(key, "parent"));
+
+ childSubjects.Should().HaveCount(1);
+ var childValue = _rand.String2(_rand.Number(5, 15));
+ childSubjects[0].OnNext(childValue);
+
+ sub.ChildCalls.Should().ContainSingle()
+ .Which.Should().Be((childValue, key));
+ }
+
+ [Fact]
+ public void EmitChanges_FiresOnceForBatch()
+ {
+ var batchSize = _rand.Number(BatchSizeMin, BatchSizeMax);
+ using var source = new SourceCache(x => x.Key);
+ var observer = new TestObserver();
+ using var sub = new TestSubscription(observer);
+ sub.ExposeCreateParent(source.Connect());
+
+ source.Edit(updater =>
+ {
+ for (var i = 0; i < batchSize; i++)
+ updater.AddOrUpdate(new TestItem(i + 1, _rand.String2(_rand.Number(3, 8))));
+ });
+
+ sub.ParentCallCount.Should().Be(1, "single batch = single ParentOnNext");
+ sub.EmitCallCount.Should().Be(1, "single batch = single EmitChanges");
+ }
+
+ [Fact]
+ public void Batching_ChildUpdatesSettleBeforeEmit()
+ {
+ var batchSize = _rand.Number(BatchSizeMin, BatchSizeMax);
+ using var source = new SourceCache(x => x.Key);
+ var observer = new TestObserver();
+ var childCount = 0;
+ using var sub = new TestSubscription(observer, key =>
+ {
+ Interlocked.Increment(ref childCount);
+ return new BehaviorSubject($"sync-{key}");
+ });
+ sub.ExposeCreateParent(source.Connect());
+
+ source.Edit(updater =>
+ {
+ for (var i = 0; i < batchSize; i++)
+ updater.AddOrUpdate(new TestItem(i + 1, _rand.String2(_rand.Number(3, 8))));
+ });
+
+ childCount.Should().Be(batchSize, "each item should create a child");
+ sub.EmitCallCount.Should().BeGreaterThanOrEqualTo(1,
+ "EmitChanges fires after parent + children settle");
+ }
+
+ [Fact]
+ public void Completion_RequiresParentAndAllChildren()
+ {
+ using var source = new SourceCache(x => x.Key);
+ var childSubjects = new List>();
+ var observer = new TestObserver();
+ using var sub = new TestSubscription(observer, key =>
+ {
+ var subj = new Subject();
+ childSubjects.Add(subj);
+ return subj;
+ });
+ sub.ExposeCreateParent(source.Connect());
+
+ source.AddOrUpdate(new TestItem(_rand.Number(SeedMin, SeedMax), "item"));
+ childSubjects.Should().HaveCount(1);
+
+ source.Dispose();
+ observer.IsCompleted.Should().BeFalse("parent complete but child still active");
+
+ childSubjects[0].OnCompleted();
+ observer.IsCompleted.Should().BeTrue("OnCompleted fires when parent + all children complete");
+ }
+
+ [Fact]
+ public void Completion_ParentOnly_NoChildren()
+ {
+ using var source = new SourceCache(x => x.Key);
+ var observer = new TestObserver();
+ using var sub = new TestSubscription(observer);
+ sub.ExposeCreateParent(source.Connect());
+
+ source.Dispose();
+ observer.IsCompleted.Should().BeTrue("immediate OnCompleted when no children");
+ }
+
+ [Fact]
+ public void Disposal_StopsAllEmissions()
+ {
+ using var source = new SourceCache(x => x.Key);
+ var childSubjects = new List>();
+ var observer = new TestObserver();
+ var sub = new TestSubscription(observer, key =>
+ {
+ var subj = new Subject();
+ childSubjects.Add(subj);
+ return subj;
+ });
+ sub.ExposeCreateParent(source.Connect());
+
+ source.AddOrUpdate(new TestItem(_rand.Number(SeedMin, SeedMax), "item"));
+ var emitsBefore = observer.EmitCount;
+
+ sub.Dispose();
+
+ source.AddOrUpdate(new TestItem(_rand.Number(SeedMin + SeedMax, SeedMax * 2), "after"));
+ if (childSubjects.Count > 0)
+ childSubjects[0].OnNext("after-dispose");
+
+ observer.EmitCount.Should().Be(emitsBefore, "no emissions after disposal");
+ }
+
+ [Fact]
+ public void Error_Propagates()
+ {
+ using var source = new TestSourceCache(x => x.Key);
+ var observer = new TestObserver();
+ using var sub = new TestSubscription(observer);
+ sub.ExposeCreateParent(source.Connect());
+
+ var error = new InvalidOperationException("test error");
+ source.SetError(error);
+
+ observer.Error.Should().BeSameAs(error);
+ }
+
+ [Fact]
+ public void Serialization_ParentAndChildDoNotInterleave()
+ {
+ using var source = new SourceCache(x => x.Key);
+ var callLog = new List();
+ var observer = new TestObserver();
+ using var sub = new TestSubscription(
+ observer,
+ key =>
+ {
+ var subj = new Subject();
+ return subj;
+ },
+ onParent: () => { lock (callLog) callLog.Add("P-start"); Thread.Sleep(1); lock (callLog) callLog.Add("P-end"); },
+ onChild: () => { lock (callLog) callLog.Add("C-start"); Thread.Sleep(1); lock (callLog) callLog.Add("C-end"); });
+ sub.ExposeCreateParent(source.Connect());
+
+ source.AddOrUpdate(new TestItem(_rand.Number(SeedMin, SeedMax), "item"));
+
+ // Start/end pairs should not interleave
+ for (var i = 0; i + 1 < callLog.Count; i += 2)
+ {
+ var prefix = callLog[i].Split('-')[0];
+ callLog[i + 1].Should().StartWith(prefix, "operations should not interleave");
+ }
+ }
+
+ ///
+ /// Proves CPS delivery runs without holding the lock. Two TestSubscription instances
+ /// whose EmitChanges callbacks write into each other's source cache — creating a
+ /// cross-cache cycle. Deadlocks on unfixed code, passes after the fix.
+ ///
+ [Trait("Category", "ExplicitDeadlock")]
+ [Fact]
+ public async Task DeadlockProof_CrossFeedingSubscriptions()
+ {
+ var iterations = _rand.Number(50, 150);
+
+ using var sourceA = new SourceCache(x => x.Key);
+ using var sourceB = new SourceCache(x => x.Key);
+
+ // Each TestSubscription's EmitChanges writes into the OTHER source (limited to prevent infinite loops)
+ var observerA = new CrossFeedObserver(sourceB, 100_001, iterations);
+ using var subA = new TestSubscription(observerA);
+ subA.ExposeCreateParent(sourceA.Connect());
+
+ var observerB = new CrossFeedObserver(sourceA, 200_001, iterations);
+ using var subB = new TestSubscription(observerB);
+ subB.ExposeCreateParent(sourceB.Connect());
+
+ using var barrier = new Barrier(2);
+
+ var taskA = Task.Run(() =>
+ {
+ var tRand = new Randomizer(56);
+ barrier.SignalAndWait();
+ for (var i = 0; i < iterations; i++)
+ sourceA.AddOrUpdate(new TestItem(tRand.Number(1, 50_000), tRand.String2(5)));
+ });
+
+ var taskB = Task.Run(() =>
+ {
+ var tRand = new Randomizer(57);
+ barrier.SignalAndWait();
+ for (var i = 0; i < iterations; i++)
+ sourceB.AddOrUpdate(new TestItem(tRand.Number(50_001, 100_000), tRand.String2(5)));
+ });
+
+ var completed = Task.WhenAll(taskA, taskB);
+ var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(30)));
+ finished.Should().BeSameAs(completed,
+ "cross-feeding CacheParentSubscriptions should not deadlock");
+ }
+
+ // ═══════════════════════════════════════════════════════════════
+ // Test Infrastructure
+ // ═══════════════════════════════════════════════════════════════
+
+ /// Observer that writes into another cache on every emission — creates cross-cache cycle.
+ private sealed class CrossFeedObserver(SourceCache target, int idBase, int maxCrossWrites) : IObserver>
+ {
+ private int _counter;
+
+ public void OnNext(IChangeSet value)
+ {
+ // Limit cross-writes to prevent infinite feedback loops
+ if (Interlocked.Increment(ref _counter) <= maxCrossWrites)
+ {
+ target.AddOrUpdate(new TestItem(idBase + _counter, "cross"));
+ }
+ }
+
+ public void OnError(Exception error) { }
+
+ public void OnCompleted() { }
+ }
+
+ ///
+ /// Minimal concrete CacheParentSubscription for testing.
+ ///
+ private sealed class TestSubscription : CacheParentSubscription>
+ {
+ private readonly Func>? _childFactory;
+ private readonly Action? _onParent;
+ private readonly Action? _onChild;
+ private readonly ChangeAwareCache _cache = new();
+
+ public int ParentCallCount;
+ public int EmitCallCount;
+ public readonly List<(string Value, int Key)> ChildCalls = [];
+
+ public TestSubscription(
+ IObserver> observer,
+ Func>? childFactory = null,
+ Action? onParent = null,
+ Action? onChild = null)
+ : base(observer)
+ {
+ _childFactory = childFactory;
+ _onParent = onParent;
+ _onChild = onChild;
+ }
+
+ public void ExposeCreateParent(IObservable> source)
+ => CreateParentSubscription(source);
+
+ protected override void ParentOnNext(IChangeSet changes)
+ {
+ Interlocked.Increment(ref ParentCallCount);
+ _onParent?.Invoke();
+ _cache.Clone(changes);
+
+ if (_childFactory is not null)
+ {
+ foreach (var change in (ChangeSet)changes)
+ {
+ if (change.Reason is ChangeReason.Add or ChangeReason.Update)
+ AddChildSubscription(MakeChildObservable(_childFactory(change.Key)), change.Key);
+ else if (change.Reason is ChangeReason.Remove)
+ RemoveChildSubscription(change.Key);
+ }
+ }
+ }
+
+ protected override void ChildOnNext(string child, int parentKey)
+ {
+ _onChild?.Invoke();
+ ChildCalls.Add((child, parentKey));
+ _cache.AddOrUpdate(new TestItem(parentKey, child), parentKey);
+ }
+
+ protected override void EmitChanges(IObserver> observer)
+ {
+ Interlocked.Increment(ref EmitCallCount);
+ var changes = _cache.CaptureChanges();
+ if (changes.Count > 0)
+ observer.OnNext(changes);
+ }
+ }
+
+ /// Observer that records emissions, completion, and errors.
+ private sealed class TestObserver : IObserver>
+ {
+ public int EmitCount;
+ public bool IsCompleted;
+ public Exception? Error;
+
+ public void OnNext(IChangeSet value) => Interlocked.Increment(ref EmitCount);
+ public void OnError(Exception error) => Error = error;
+ public void OnCompleted() => IsCompleted = true;
+ }
+}
\ No newline at end of file
diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs
new file mode 100644
index 00000000..6c19816d
--- /dev/null
+++ b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs
@@ -0,0 +1,560 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+using DynamicData.Internal;
+using FluentAssertions;
+using Xunit;
+
+namespace DynamicData.Tests.Internal;
+
+public class DeliveryQueueFixture
+{
+#if NET9_0_OR_GREATER
+ private readonly Lock _gate = new();
+#else
+ private readonly object _gate = new();
+#endif
+
+ /// Helper observer that captures OnNext items into a list.
+ private sealed class ListObserver : IObserver
+ {
+ private readonly List _items = new();
+ public IReadOnlyList Items => _items;
+ public Exception? Error { get; private set; }
+ public bool IsCompleted { get; private set; }
+
+ public void OnNext(T value) => _items.Add(value);
+ public void OnError(Exception error) => Error = error;
+ public void OnCompleted() => IsCompleted = true;
+ }
+
+ /// Thread-safe observer for concurrent tests.
+ private sealed class ConcurrentObserver : IObserver
+ {
+ private readonly ConcurrentBag _items = new();
+ public ConcurrentBag Items => _items;
+
+ public void OnNext(T value) => _items.Add(value);
+ public void OnError(Exception error) { }
+ public void OnCompleted() { }
+ }
+
+ /// Thread-safe ordered observer for concurrent tests.
+ private sealed class ConcurrentQueueObserver : IObserver
+ {
+ private readonly ConcurrentQueue _items = new();
+ public ConcurrentQueue Items => _items;
+
+ public void OnNext(T value) => _items.Enqueue(value);
+ public void OnError(Exception error) { }
+ public void OnCompleted() { }
+ }
+
+ private static void EnqueueAndDeliver(DeliveryQueue queue, T item)
+ {
+ using var scope = queue.AcquireLock();
+ scope.Enqueue(item);
+ }
+
+ private static void TriggerDelivery(DeliveryQueue queue)
+ {
+ using var scope = queue.AcquireLock();
+ }
+
+ [Fact]
+ public void EnqueueAndDeliverDeliversItem()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+
+ EnqueueAndDeliver(queue, "A");
+
+ observer.Items.Should().Equal("A");
+ }
+
+ [Fact]
+ public void DeliverDeliversItemsInFifoOrder()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+
+ using (var scope = queue.AcquireLock())
+ {
+ scope.Enqueue("A");
+ scope.Enqueue("B");
+ scope.Enqueue("C");
+ }
+
+ observer.Items.Should().Equal("A", "B", "C");
+ }
+
+ [Fact]
+ public void DeliverWithEmptyQueueIsNoOp()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+
+ TriggerDelivery(queue);
+
+ observer.Items.Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task OnlyOneDelivererAtATime()
+ {
+ var concurrentCount = 0;
+ var maxConcurrent = 0;
+ var deliveryCount = 0;
+ var delivered = new ConcurrentBag();
+ using var firstDeliveryStarted = new ManualResetEventSlim(false);
+ using var allowFirstDeliveryToContinue = new ManualResetEventSlim(false);
+ using var startContenders = new ManualResetEventSlim(false);
+
+ var observer = new BlockingObserver(
+ onNextAction: item =>
+ {
+ var current = Interlocked.Increment(ref concurrentCount);
+ int snapshot;
+ do
+ {
+ snapshot = maxConcurrent;
+ if (current <= snapshot) break;
+ }
+ while (Interlocked.CompareExchange(ref maxConcurrent, current, snapshot) != snapshot);
+
+ delivered.Add(item);
+
+ if (Interlocked.Increment(ref deliveryCount) == 1)
+ {
+ firstDeliveryStarted.Set();
+ allowFirstDeliveryToContinue.Wait();
+ }
+
+ Thread.SpinWait(1000);
+ Interlocked.Decrement(ref concurrentCount);
+ });
+
+ var queue = new DeliveryQueue(_gate, observer);
+
+ var firstDelivery = Task.Run(() => EnqueueAndDeliver(queue, -1));
+ firstDeliveryStarted.Wait();
+
+ var enqueueTasks = Enumerable.Range(0, 100)
+ .Select(i => Task.Run(() =>
+ {
+ startContenders.Wait();
+ EnqueueAndDeliver(queue, i);
+ }));
+
+ var triggerTasks = Enumerable.Range(0, 4)
+ .Select(_ => Task.Run(() =>
+ {
+ startContenders.Wait();
+ TriggerDelivery(queue);
+ }));
+
+ var tasks = enqueueTasks.Concat(triggerTasks).ToArray();
+ startContenders.Set();
+ allowFirstDeliveryToContinue.Set();
+
+ await Task.WhenAll(tasks.Append(firstDelivery));
+
+ maxConcurrent.Should().Be(1, "only one thread should be delivering at a time");
+ delivered.Should().HaveCount(101);
+ }
+
+ [Fact]
+ public void SecondWriterItemPickedUpByFirstDeliverer()
+ {
+ var observer = new ListObserver();
+ DeliveryQueue? q = null;
+
+ var enqueuingObserver = new DelegateObserver(item =>
+ {
+ observer.OnNext(item);
+ if (observer.Items.Count == 1)
+ {
+ using var scope = q!.AcquireLock();
+ scope.Enqueue("B");
+ }
+ });
+
+ var queue = new DeliveryQueue(_gate, enqueuingObserver);
+ q = queue;
+
+ EnqueueAndDeliver(queue, "A");
+
+ observer.Items.Should().Equal("A", "B");
+ }
+
+ [Fact]
+ public void ReentrantEnqueueDoesNotRecurse()
+ {
+ var callDepth = 0;
+ var maxDepth = 0;
+ var delivered = new List();
+ DeliveryQueue? q = null;
+
+ var observer = new DelegateObserver(item =>
+ {
+ callDepth++;
+ if (callDepth > maxDepth) maxDepth = callDepth;
+
+ delivered.Add(item);
+
+ if (item == "A")
+ {
+ using var scope = q!.AcquireLock();
+ scope.Enqueue("B");
+ }
+
+ callDepth--;
+ });
+
+ var queue = new DeliveryQueue(_gate, observer);
+ q = queue;
+
+ EnqueueAndDeliver(queue, "A");
+
+ delivered.Should().Equal("A", "B");
+ maxDepth.Should().Be(1, "delivery callback should not recurse");
+ }
+
+ [Fact]
+ public void ExceptionInDeliveryResetsDeliveryToken()
+ {
+ var callCount = 0;
+ var observer = new DelegateObserver(_ =>
+ {
+ if (++callCount == 1)
+ throw new InvalidOperationException("boom");
+ });
+
+ var queue = new DeliveryQueue(_gate, observer);
+
+ var act = () => EnqueueAndDeliver(queue, "A");
+ act.Should().Throw();
+
+ EnqueueAndDeliver(queue, "B");
+
+ callCount.Should().Be(2, "delivery should work after exception recovery");
+ }
+
+ [Fact]
+ public void RemainingItemsDeliveredAfterExceptionRecovery()
+ {
+ var delivered = new List();
+ var shouldThrow = true;
+ var observer = new DelegateObserver(item =>
+ {
+ if (shouldThrow && item == "A")
+ throw new InvalidOperationException("boom");
+ delivered.Add(item);
+ });
+
+ var queue = new DeliveryQueue(_gate, observer);
+
+ var act = () =>
+ {
+ using var scope = queue.AcquireLock();
+ scope.Enqueue("A");
+ scope.Enqueue("B");
+ };
+
+ act.Should().Throw();
+
+ shouldThrow = false;
+ TriggerDelivery(queue);
+
+ delivered.Should().Equal("B");
+ }
+
+ [Fact]
+ public void TerminalCompletedStopsDelivery()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+
+ using (var scope = queue.AcquireLock())
+ {
+ scope.Enqueue("A");
+ scope.EnqueueCompleted();
+ scope.Enqueue("B"); // should be ignored after terminal
+ }
+
+ observer.Items.Should().Equal("A");
+ observer.IsCompleted.Should().BeTrue();
+ queue.IsTerminated.Should().BeTrue();
+ }
+
+ [Fact]
+ public void TerminalErrorStopsDelivery()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+ var error = new InvalidOperationException("test");
+
+ using (var scope = queue.AcquireLock())
+ {
+ scope.Enqueue("A");
+ scope.EnqueueError(error);
+ scope.Enqueue("B"); // should be ignored after terminal
+ }
+
+ observer.Items.Should().Equal("A");
+ observer.Error.Should().BeSameAs(error);
+ queue.IsTerminated.Should().BeTrue();
+ }
+
+ [Fact]
+ public void EnqueueAfterTerminationIsIgnored()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+
+ using (var scope = queue.AcquireLock())
+ {
+ scope.EnqueueCompleted();
+ }
+
+ EnqueueAndDeliver(queue, "AFTER");
+
+ observer.Items.Should().BeEmpty();
+ }
+
+ [Fact]
+ public void IsTerminatedIsFalseInitially()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+ queue.IsTerminated.Should().BeFalse();
+ }
+
+ [Fact]
+ public async Task ConcurrentEnqueueAllItemsDelivered()
+ {
+ const int threadCount = 8;
+ const int itemsPerThread = 500;
+ var observer = new ConcurrentObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+
+ var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() =>
+ {
+ for (var i = 0; i < itemsPerThread; i++)
+ EnqueueAndDeliver(queue, (t * itemsPerThread) + i);
+ })).ToArray();
+
+ await Task.WhenAll(tasks);
+ TriggerDelivery(queue);
+
+ observer.Items.Count.Should().Be(threadCount * itemsPerThread);
+ }
+
+ [Fact]
+ public async Task ConcurrentEnqueueNoDuplicates()
+ {
+ const int threadCount = 8;
+ const int itemsPerThread = 500;
+ var observer = new ConcurrentObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+
+ var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() =>
+ {
+ for (var i = 0; i < itemsPerThread; i++)
+ EnqueueAndDeliver(queue, (t * itemsPerThread) + i);
+ })).ToArray();
+
+ await Task.WhenAll(tasks);
+ TriggerDelivery(queue);
+
+ observer.Items.Distinct().Count().Should().Be(threadCount * itemsPerThread);
+ }
+
+ [Fact]
+ public async Task ConcurrentEnqueuePreservesPerThreadOrdering()
+ {
+ const int threadCount = 4;
+ const int itemsPerThread = 200;
+ var observer = new ConcurrentQueueObserver<(int Thread, int Seq)>();
+ var queue = new DeliveryQueue<(int Thread, int Seq)>(_gate, observer);
+
+ var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() =>
+ {
+ for (var i = 0; i < itemsPerThread; i++)
+ EnqueueAndDeliver(queue, (t, i));
+ })).ToArray();
+
+ await Task.WhenAll(tasks);
+ TriggerDelivery(queue);
+
+ var itemsByThread = observer.Items.ToArray().GroupBy(x => x.Thread)
+ .ToDictionary(g => g.Key, g => g.Select(x => x.Seq).ToList());
+
+ foreach (var (thread, sequences) in itemsByThread)
+ sequences.Should().BeInAscendingOrder($"items from thread {thread} should preserve enqueue order");
+ }
+
+ /// Observer that delegates OnNext to an action.
+ private sealed class DelegateObserver(Action onNextAction) : IObserver
+ {
+ public void OnNext(T value) => onNextAction(value);
+ public void OnError(Exception error) { }
+ public void OnCompleted() { }
+ }
+
+ /// Observer with blocking capability for concurrency tests.
+ private sealed class BlockingObserver(Action onNextAction) : IObserver
+ {
+ public void OnNext(T value) => onNextAction(value);
+ public void OnError(Exception error) { }
+ public void OnCompleted() { }
+ }
+
+ [Fact]
+ public void EnsureDeliveryCompleteTerminatesQueue()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+
+ EnqueueAndDeliver(queue, "A");
+ queue.EnsureDeliveryComplete();
+
+ queue.IsTerminated.Should().BeTrue();
+
+ // Further enqueues should be ignored
+ EnqueueAndDeliver(queue, "B");
+ observer.Items.Should().Equal("A");
+ }
+
+ [Fact]
+ public void EnsureDeliveryCompleteClearsPendingItems()
+ {
+ var observer = new ListObserver();
+ var deliveryCount = 0;
+ DeliveryQueue? q = null;
+
+ var blockingObserver = new DelegateObserver(item =>
+ {
+ observer.OnNext(item);
+ if (++deliveryCount == 1)
+ {
+ // While delivering first item, enqueue more then terminate
+ using (var scope = q!.AcquireLock())
+ {
+ scope.Enqueue("B");
+ scope.Enqueue("C");
+ }
+
+ q!.EnsureDeliveryComplete(); // re-entrant — should not spin
+ }
+ });
+
+ var queue = new DeliveryQueue(_gate, blockingObserver);
+ q = queue;
+
+ EnqueueAndDeliver(queue, "A");
+
+ // Only "A" should be delivered — "B" and "C" were cleared by EnsureDeliveryComplete
+ observer.Items.Should().Equal("A");
+ queue.IsTerminated.Should().BeTrue();
+ }
+
+ [Fact]
+ public void EnsureDeliveryCompleteFromDrainThreadDoesNotDeadlock()
+ {
+ var observer = new ListObserver();
+ DeliveryQueue? q = null;
+
+ var terminatingObserver = new DelegateObserver(_ =>
+ {
+ // Called from drain thread — EnsureDeliveryComplete must detect
+ // re-entrancy via _drainThreadId and skip the spin-wait
+ q!.EnsureDeliveryComplete();
+ });
+
+ var queue = new DeliveryQueue(_gate, terminatingObserver);
+ q = queue;
+
+ // This should NOT deadlock
+ var completed = Task.Run(() => EnqueueAndDeliver(queue, "A"));
+ var finished = Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(5))).Result;
+ finished.Should().BeSameAs(completed, "EnsureDeliveryComplete from drain thread should not deadlock");
+ }
+
+ [Fact]
+ public async Task EnsureDeliveryCompleteWaitsForInFlightDelivery()
+ {
+ var observer = new ListObserver();
+ using var deliveryStarted = new ManualResetEventSlim(false);
+ using var allowDeliveryToFinish = new ManualResetEventSlim(false);
+
+ var slowObserver = new DelegateObserver(item =>
+ {
+ observer.OnNext(item);
+ deliveryStarted.Set();
+ allowDeliveryToFinish.Wait();
+ });
+
+ var queue = new DeliveryQueue(_gate, slowObserver);
+
+ // Start delivering — will block in observer
+ var deliverTask = Task.Run(() => EnqueueAndDeliver(queue, 42));
+ deliveryStarted.Wait();
+
+ // Drain thread is blocked in observer callback. EnsureDeliveryComplete should spin.
+ var terminateTask = Task.Run(() => queue.EnsureDeliveryComplete());
+
+ // Give terminate a moment to enter spin-wait
+ await Task.Delay(100);
+ terminateTask.IsCompleted.Should().BeFalse("should be spinning waiting for delivery");
+
+ // Release the delivery
+ allowDeliveryToFinish.Set();
+
+ await Task.WhenAll(deliverTask, terminateTask);
+ queue.IsTerminated.Should().BeTrue();
+ observer.Items.Should().Equal(42);
+ }
+
+ [Fact]
+ public void TerminalItemsDeliveredBeforeTermination()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+
+ using (var scope = queue.AcquireLock())
+ {
+ scope.Enqueue("A");
+ scope.Enqueue("B");
+ scope.EnqueueCompleted();
+ scope.Enqueue("C"); // should be ignored — after terminal
+ }
+
+ observer.Items.Should().Equal("A", "B");
+ observer.IsCompleted.Should().BeTrue();
+ queue.IsTerminated.Should().BeTrue();
+ }
+
+ [Fact]
+ public void ErrorTerminatesAndClearsPending()
+ {
+ var observer = new ListObserver();
+ var queue = new DeliveryQueue(_gate, observer);
+ var error = new InvalidOperationException("test");
+
+ using (var scope = queue.AcquireLock())
+ {
+ scope.Enqueue("A");
+ scope.EnqueueError(error);
+ scope.Enqueue("B"); // should be ignored
+ }
+
+ observer.Items.Should().Equal("A");
+ observer.Error.Should().BeSameAs(error);
+ queue.IsTerminated.Should().BeTrue();
+ }
+}
\ No newline at end of file
diff --git a/src/DynamicData.Tests/Internal/KeyedDisposableFixture.cs b/src/DynamicData.Tests/Internal/KeyedDisposableFixture.cs
new file mode 100644
index 00000000..4c180c8c
--- /dev/null
+++ b/src/DynamicData.Tests/Internal/KeyedDisposableFixture.cs
@@ -0,0 +1,173 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Collections.Generic;
+
+using DynamicData.Internal;
+using FluentAssertions;
+using Xunit;
+
+namespace DynamicData.Tests.Internal;
+
+public class KeyedDisposableFixture
+{
+ [Fact]
+ public void AddTracksDisposable()
+ {
+ var tracker = new KeyedDisposable();
+ var disposed = false;
+ var item = new TestDisposable(() => disposed = true);
+
+ tracker.Add("key", item);
+
+ tracker.ContainsKey("key").Should().BeTrue();
+ disposed.Should().BeFalse();
+ }
+
+ [Fact]
+ public void RemoveDisposesItem()
+ {
+ var tracker = new KeyedDisposable();
+ var disposed = false;
+ tracker.Add("key", new TestDisposable(() => disposed = true));
+
+ tracker.Remove("key");
+
+ disposed.Should().BeTrue();
+ tracker.ContainsKey("key").Should().BeFalse();
+ }
+
+ [Fact]
+ public void AddWithSameKeyDisposePrevious()
+ {
+ var tracker = new KeyedDisposable();
+ var disposed1 = false;
+ var disposed2 = false;
+ tracker.Add("key", new TestDisposable(() => disposed1 = true));
+
+ tracker.Add("key", new TestDisposable(() => disposed2 = true));
+
+ disposed1.Should().BeTrue("previous item should be disposed");
+ disposed2.Should().BeFalse("new item should not be disposed");
+ }
+
+ [Fact]
+ public void AddWithSameReferenceDoesNotDispose()
+ {
+ var tracker = new KeyedDisposable();
+ var disposeCount = 0;
+ var item = new TestDisposable(() => disposeCount++);
+
+ tracker.Add("key", item);
+ tracker.Add("key", item); // same reference
+
+ disposeCount.Should().Be(0, "same reference should not be disposed");
+ tracker.ContainsKey("key").Should().BeTrue();
+ }
+
+ [Fact]
+ public void DisposeDisposesAllItems()
+ {
+ var tracker = new KeyedDisposable();
+ var disposedCount = 0;
+ for (var i = 0; i < 5; i++)
+ tracker.Add(i, new TestDisposable(() => disposedCount++));
+
+ tracker.Dispose();
+
+ disposedCount.Should().Be(5);
+ tracker.IsDisposed.Should().BeTrue();
+ }
+
+ [Fact]
+ public void DisposeIsIdempotent()
+ {
+ var tracker = new KeyedDisposable();
+ var disposeCount = 0;
+ tracker.Add("key", new TestDisposable(() => disposeCount++));
+
+ tracker.Dispose();
+ tracker.Dispose();
+
+ disposeCount.Should().Be(1);
+ }
+
+ [Fact]
+ public void AddAfterDisposeDisposesImmediately()
+ {
+ var tracker = new KeyedDisposable();
+ tracker.Dispose();
+
+ var disposed = false;
+ tracker.Add("key", new TestDisposable(() => disposed = true));
+
+ disposed.Should().BeTrue("item added after Dispose should be disposed immediately");
+ }
+
+ [Fact]
+ public void DisposeAggregatesExceptions()
+ {
+ var tracker = new KeyedDisposable();
+ tracker.Add(1, new TestDisposable(() => throw new InvalidOperationException("boom1")));
+ tracker.Add(2, new TestDisposable(() => { }));
+ tracker.Add(3, new TestDisposable(() => throw new InvalidOperationException("boom3")));
+
+ var act = () => tracker.Dispose();
+
+ act.Should().Throw()
+ .Which.InnerExceptions.Should().HaveCount(2);
+ tracker.Count.Should().Be(0, "all items should be cleared even after exceptions");
+ }
+
+ [Fact]
+ public void AddIfDisposableTracksDisposableItem()
+ {
+ var tracker = new KeyedDisposable();
+ var disposed = false;
+ var item = new TestDisposable(() => disposed = true);
+
+ tracker.AddIfDisposable("key", item);
+
+ tracker.ContainsKey("key").Should().BeTrue();
+
+ tracker.Remove("key");
+ disposed.Should().BeTrue();
+ }
+
+ [Fact]
+ public void AddIfDisposableIgnoresNonDisposableItem()
+ {
+ var tracker = new KeyedDisposable();
+
+ tracker.AddIfDisposable("key", "not disposable");
+
+ tracker.ContainsKey("key").Should().BeFalse();
+ }
+
+ [Fact]
+ public void AddIfDisposableRemovesPreviousWhenNewIsNotDisposable()
+ {
+ var tracker = new KeyedDisposable();
+ var disposed = false;
+ tracker.Add("key", new TestDisposable(() => disposed = true));
+
+ tracker.AddIfDisposable("key", "not disposable");
+
+ disposed.Should().BeTrue("previous disposable should be disposed");
+ tracker.ContainsKey("key").Should().BeFalse();
+ }
+
+ [Fact]
+ public void RemoveNonExistentKeyIsNoOp()
+ {
+ var tracker = new KeyedDisposable();
+ tracker.Remove("nonexistent"); // should not throw
+ }
+
+ private sealed class TestDisposable(Action onDispose) : IDisposable
+ {
+ public void Dispose() => onDispose();
+ }
+}
\ No newline at end of file
diff --git a/src/DynamicData.Tests/Internal/SharedDeliveryQueueFixture.cs b/src/DynamicData.Tests/Internal/SharedDeliveryQueueFixture.cs
new file mode 100644
index 00000000..6b278397
--- /dev/null
+++ b/src/DynamicData.Tests/Internal/SharedDeliveryQueueFixture.cs
@@ -0,0 +1,187 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+using DynamicData.Internal;
+using FluentAssertions;
+using Xunit;
+
+namespace DynamicData.Tests.Internal;
+
+public class SharedDeliveryQueueFixture
+{
+#if NET9_0_OR_GREATER
+ private readonly Lock _gate = new();
+#else
+ private readonly object _gate = new();
+#endif
+
+ [Fact]
+ public void SingleSourceDeliversItems()
+ {
+ var queue = new SharedDeliveryQueue(_gate);
+ var delivered = new List();
+ var observer = new TestObserver(delivered.Add);
+ var sub = queue.CreateQueue(observer);
+
+ using (var scope = sub.AcquireLock())
+ {
+ scope.Enqueue(1);
+ scope.Enqueue(2);
+ scope.Enqueue(3);
+ }
+
+ delivered.Should().Equal(1, 2, 3);
+ }
+
+ [Fact]
+ public void MultipleSourcesSerializeDelivery()
+ {
+ var queue = new SharedDeliveryQueue(_gate);
+ var delivered = new List();
+ var obs1 = new TestObserver(i => delivered.Add($"int:{i}"));
+ var obs2 = new TestObserver(s => delivered.Add($"str:{s}"));
+ var sub1 = queue.CreateQueue(obs1);
+ var sub2 = queue.CreateQueue(obs2);
+
+ using (var scope1 = sub1.AcquireLock())
+ {
+ scope1.Enqueue(1);
+ }
+
+ using (var scope2 = sub2.AcquireLock())
+ {
+ scope2.Enqueue("hello");
+ }
+
+ delivered.Should().Equal("int:1", "str:hello");
+ }
+
+ [Fact]
+ public void ErrorTerminatesAllSubQueues()
+ {
+ var queue = new SharedDeliveryQueue(_gate);
+ var delivered1 = new List();
+ var delivered2 = new List();
+ var obs1 = new TestObserver(delivered1.Add);
+ var obs2 = new TestObserver(delivered2.Add);
+ var sub1 = queue.CreateQueue(obs1);
+ var sub2 = queue.CreateQueue(obs2);
+
+ using (var scope1 = sub1.AcquireLock())
+ {
+ scope1.Enqueue(1);
+ scope1.EnqueueError(new InvalidOperationException("boom"));
+ }
+
+ queue.IsTerminated.Should().BeTrue();
+
+ // Further enqueues should be ignored
+ using (var scope2 = sub2.AcquireLock())
+ {
+ scope2.Enqueue("ignored");
+ }
+
+ delivered1.Should().Equal(1);
+ obs1.Error.Should().NotBeNull();
+ delivered2.Should().BeEmpty();
+ }
+
+ [Fact]
+ public void CompletionDoesNotTerminateParent()
+ {
+ var queue = new SharedDeliveryQueue(_gate);
+ var delivered1 = new List();
+ var delivered2 = new List();
+ var obs1 = new TestObserver(delivered1.Add);
+ var obs2 = new TestObserver(delivered2.Add);
+ var sub1 = queue.CreateQueue(obs1);
+ var sub2 = queue.CreateQueue(obs2);
+
+ using (var scope1 = sub1.AcquireLock())
+ {
+ scope1.Enqueue(1);
+ scope1.EnqueueCompleted();
+ }
+
+ queue.IsTerminated.Should().BeFalse("completion of one sub-queue should not terminate parent");
+ obs1.IsCompleted.Should().BeTrue();
+
+ // Other sub-queue should still work
+ using (var scope2 = sub2.AcquireLock())
+ {
+ scope2.Enqueue("still alive");
+ }
+
+ delivered2.Should().Equal("still alive");
+ }
+
+ [Fact]
+ public void EnsureDeliveryCompleteTerminatesAndWaits()
+ {
+ var queue = new SharedDeliveryQueue(_gate);
+ var observer = new TestObserver(_ => { });
+ var sub = queue.CreateQueue(observer);
+
+ using (var scope = sub.AcquireLock())
+ {
+ scope.Enqueue(1);
+ }
+
+ queue.EnsureDeliveryComplete();
+
+ queue.IsTerminated.Should().BeTrue();
+ }
+
+ [Fact]
+ public async Task ConcurrentMultiSourceDelivery()
+ {
+ const int threadCount = 4;
+ const int itemsPerThread = 200;
+ var queue = new SharedDeliveryQueue(_gate);
+ var delivered = new ConcurrentBag();
+
+ var subQueues = Enumerable.Range(0, threadCount).Select(t =>
+ {
+ var obs = new TestObserver(i => delivered.Add($"{t}:{i}"));
+ return queue.CreateQueue(obs);
+ }).ToArray();
+
+ var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() =>
+ {
+ for (var i = 0; i < itemsPerThread; i++)
+ {
+ using var scope = subQueues[t].AcquireLock();
+ scope.Enqueue(i);
+ }
+ })).ToArray();
+
+ await Task.WhenAll(tasks);
+
+ delivered.Count.Should().Be(threadCount * itemsPerThread);
+
+ // Each thread's items should all be present
+ for (var t = 0; t < threadCount; t++)
+ {
+ var threadItems = delivered.Where(s => s.StartsWith($"{t}:")).Count();
+ threadItems.Should().Be(itemsPerThread);
+ }
+ }
+
+ private sealed class TestObserver(Action onNext) : IObserver
+ {
+ public Exception? Error { get; private set; }
+ public bool IsCompleted { get; private set; }
+
+ public void OnNext(T value) => onNext(value);
+ public void OnError(Exception error) => Error = error;
+ public void OnCompleted() => IsCompleted = true;
+ }
+}
\ No newline at end of file
diff --git a/src/DynamicData/Binding/SortAndBind.cs b/src/DynamicData/Binding/SortAndBind.cs
index 80cddd3e..1e89f6af 100644
--- a/src/DynamicData/Binding/SortAndBind.cs
+++ b/src/DynamicData/Binding/SortAndBind.cs
@@ -65,11 +65,11 @@ public SortAndBind(IObservable> source,
comparerChanged = comparerChanged.ObserveOn(scheduler);
}
- var locker = InternalEx.NewLock();
+ var queue = new SharedDeliveryQueue();
SortApplicator? sortApplicator = null;
// Create a new sort applicator each time.
- var latestComparer = comparerChanged.Synchronize(locker)
+ var latestComparer = comparerChanged.SynchronizeSafe(queue)
.Subscribe(comparer =>
{
sortApplicator = new SortApplicator(_cache, target, comparer, options);
@@ -77,7 +77,7 @@ public SortAndBind(IObservable> source,
});
// Listen to changes and apply the sorting
- var subscriber = source.Synchronize(locker)
+ var subscriber = source.SynchronizeSafe(queue)
.Select((changes, index) =>
{
_cache.Clone(changes);
diff --git a/src/DynamicData/Cache/Internal/AsyncDisposeMany.cs b/src/DynamicData/Cache/Internal/AsyncDisposeMany.cs
index a134360c..3d697fec 100644
--- a/src/DynamicData/Cache/Internal/AsyncDisposeMany.cs
+++ b/src/DynamicData/Cache/Internal/AsyncDisposeMany.cs
@@ -27,7 +27,6 @@ public static IObservable> Create(
.Create>(downstreamObserver =>
{
var itemsByKey = new Dictionary();
-
var synchronizationGate = InternalEx.NewLock();
var disposals = new Subject>();
@@ -35,16 +34,13 @@ public static IObservable> Create(
.Merge()
.IgnoreElements()
.Concat(Observable.Return(Unit.Default))
- // If no one subscribes to this stream, disposals won't actually occur, so make sure we have one (and only one) regardless of what the consumer does.
.Publish()
.AutoConnect(0);
- // Make sure the consumer gets a chance to subscribe BEFORE we actually start processing items, so there's no risk of the consumer missing notifications.
disposalsCompletedAccessor.Invoke(disposalsCompleted);
var sourceSubscription = source
- .Synchronize(synchronizationGate)
- // Using custom notification handlers instead of .Do() to make sure that we're not disposing items until AFTER we've notified all downstream listeners to remove them from their cached or bound collections.
+ .SynchronizeSafe(synchronizationGate, out var queue)
.SubscribeSafe(
onNext: upstreamChanges =>
{
@@ -70,24 +66,19 @@ public static IObservable> Create(
onError: error =>
{
downstreamObserver.OnError(error);
-
TearDown();
},
onCompleted: () =>
{
downstreamObserver.OnCompleted();
-
TearDown();
});
return Disposable.Create(() =>
{
- lock (synchronizationGate)
- {
- sourceSubscription.Dispose();
-
- TearDown();
- }
+ sourceSubscription.Dispose();
+ queue.EnsureDeliveryComplete();
+ TearDown();
});
void TearDown()
@@ -99,7 +90,6 @@ void TearDown()
foreach (var item in itemsByKey.Values)
TryDisposeItem(item);
disposals.OnCompleted();
-
itemsByKey.Clear();
}
catch (Exception error)
@@ -119,4 +109,4 @@ void TryDisposeItem(TObject item)
});
}
}
-#endif
+#endif
\ No newline at end of file
diff --git a/src/DynamicData/Cache/Internal/AutoRefresh.cs b/src/DynamicData/Cache/Internal/AutoRefresh.cs
index d489a56d..9ef24e41 100644
--- a/src/DynamicData/Cache/Internal/AutoRefresh.cs
+++ b/src/DynamicData/Cache/Internal/AutoRefresh.cs
@@ -32,8 +32,8 @@ public IObservable> Run() => Observable.Create list.Count > 0).Select(items => new ChangeSet(items));
// publish refreshes and underlying changes
- var locker = InternalEx.NewLock();
- var publisher = shared.Synchronize(locker).Merge(refreshChanges.Synchronize(locker)).SubscribeSafe(observer);
+ var queue = new SharedDeliveryQueue();
+ var publisher = shared.SynchronizeSafe(queue).Merge(refreshChanges.SynchronizeSafe(queue)).SubscribeSafe(observer);
return new CompositeDisposable(publisher, shared.Connect());
});
diff --git a/src/DynamicData/Cache/Internal/BatchIf.cs b/src/DynamicData/Cache/Internal/BatchIf.cs
index 17d5e4a3..54da2673 100644
--- a/src/DynamicData/Cache/Internal/BatchIf.cs
+++ b/src/DynamicData/Cache/Internal/BatchIf.cs
@@ -1,4 +1,4 @@
-// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
@@ -23,7 +23,7 @@ public IObservable> Run() => Observable.Create
{
var batchedChanges = new List>();
- var locker = InternalEx.NewLock();
+ var queue = new SharedDeliveryQueue();
var paused = initialPauseState;
var timeoutDisposer = new SerialDisposable();
var intervalTimerDisposer = new SerialDisposable();
@@ -46,7 +46,7 @@ void ResumeAction()
}
IDisposable IntervalFunction() =>
- intervalTimer.Synchronize(locker).Finally(() => paused = false).Subscribe(
+ intervalTimer.SynchronizeSafe(queue).Finally(() => paused = false).Subscribe(
_ =>
{
paused = false;
@@ -62,7 +62,7 @@ IDisposable IntervalFunction() =>
intervalTimerDisposer.Disposable = IntervalFunction();
}
- var pausedHandler = _pauseIfTrueSelector.Synchronize(locker).Subscribe(
+ var pausedHandler = _pauseIfTrueSelector.SynchronizeSafe(queue).Subscribe(
p =>
{
paused = p;
@@ -78,7 +78,7 @@ IDisposable IntervalFunction() =>
}
else if (timeOut.HasValue)
{
- timeoutDisposer.Disposable = Observable.Timer(timeOut.Value, _scheduler).Synchronize(locker).Subscribe(
+ timeoutDisposer.Disposable = Observable.Timer(timeOut.Value, _scheduler).SynchronizeSafe(queue).Subscribe(
_ =>
{
paused = false;
@@ -87,7 +87,7 @@ IDisposable IntervalFunction() =>
}
});
- var publisher = _source.Synchronize(locker).Subscribe(
+ var publisher = _source.SynchronizeSafe(queue).Subscribe(
changes =>
{
batchedChanges.Add(changes);
diff --git a/src/DynamicData/Cache/Internal/DisposeMany.cs b/src/DynamicData/Cache/Internal/DisposeMany.cs
index 7271896c..f5336bd4 100644
--- a/src/DynamicData/Cache/Internal/DisposeMany.cs
+++ b/src/DynamicData/Cache/Internal/DisposeMany.cs
@@ -1,4 +1,4 @@
-// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
@@ -17,11 +17,11 @@ internal sealed class DisposeMany(IObservable> Run()
=> Observable.Create>(observer =>
{
- // Will be locking on cachedItems directly, instead of using an anonymous gate object. This is acceptable, since it's a privately-held object, there's no risk of deadlock from other consumers locking on it.
- var cachedItems = new Dictionary();
+ var locker = InternalEx.NewLock();
+ var tracked = new KeyedDisposable();
var sourceSubscription = _source
- .Synchronize(cachedItems)
+ .SynchronizeSafe(locker, out var queue)
.SubscribeSafe(Observer.Create>(
onNext: changeSet =>
{
@@ -31,53 +31,25 @@ public IObservable> Run()
{
switch (change.Reason)
{
+ case ChangeReason.Add:
case ChangeReason.Update:
- if (change.Previous.HasValue && !EqualityComparer.Default.Equals(change.Current, change.Previous.Value))
- {
- (change.Previous.Value as IDisposable)?.Dispose();
- }
-
+ tracked.AddIfDisposable(change.Key, change.Current);
break;
case ChangeReason.Remove:
- (change.Current as IDisposable)?.Dispose();
+ tracked.Remove(change.Key);
break;
}
}
-
- cachedItems.Clone(changeSet);
- },
- onError: error =>
- {
- observer.OnError(error);
-
- ProcessFinalization(cachedItems);
},
- onCompleted: () =>
- {
- observer.OnCompleted();
-
- ProcessFinalization(cachedItems);
- }));
+ onError: observer.OnError,
+ onCompleted: observer.OnCompleted));
return Disposable.Create(() =>
{
sourceSubscription.Dispose();
-
- lock (cachedItems)
- {
- ProcessFinalization(cachedItems);
- }
+ queue.EnsureDeliveryComplete();
+ tracked.Dispose();
});
});
-
- private static void ProcessFinalization(Dictionary cachedItems)
- {
- foreach (var pair in cachedItems)
- {
- (pair.Value as IDisposable)?.Dispose();
- }
-
- cachedItems.Clear();
- }
-}
+}
\ No newline at end of file
diff --git a/src/DynamicData/Cache/Internal/DynamicCombiner.cs b/src/DynamicData/Cache/Internal/DynamicCombiner.cs
index a5eba2bd..f047b88d 100644
--- a/src/DynamicData/Cache/Internal/DynamicCombiner.cs
+++ b/src/DynamicData/Cache/Internal/DynamicCombiner.cs
@@ -1,4 +1,4 @@
-// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
@@ -16,19 +16,19 @@ internal sealed class DynamicCombiner(IObservableList> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
+ var queue = new SharedDeliveryQueue();
// this is the resulting cache which produces all notifications
var resultCache = new ChangeAwareCache();
// Transform to a merge container.
// This populates a RefTracker when the original source is subscribed to
- var sourceLists = _source.Connect().Synchronize(locker).Transform(changeSet => new MergeContainer(changeSet)).AsObservableList();
+ var sourceLists = _source.Connect().SynchronizeSafe(queue).Transform(changeSet => new MergeContainer(changeSet)).AsObservableList();
var sharedLists = sourceLists.Connect().Publish();
// merge the items back together
- var allChanges = sharedLists.MergeMany(mc => mc.Source).Synchronize(locker).Subscribe(
+ var allChanges = sharedLists.MergeMany(mc => mc.Source).SynchronizeSafe(queue).Subscribe(
changes =>
{
// Populate result list and check for changes
diff --git a/src/DynamicData/Cache/Internal/FullJoin.cs b/src/DynamicData/Cache/Internal/FullJoin.cs
index f4e4f543..f6395255 100644
--- a/src/DynamicData/Cache/Internal/FullJoin.cs
+++ b/src/DynamicData/Cache/Internal/FullJoin.cs
@@ -1,4 +1,4 @@
-// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
@@ -26,10 +26,11 @@ public IObservable> Run() => Observable.Creat
observer =>
{
var locker = InternalEx.NewLock();
+ var queue = new SharedDeliveryQueue(locker);
// create local backing stores
- var leftCache = _left.Synchronize(locker).AsObservableCache(false);
- var rightCache = _right.Synchronize(locker).ChangeKey(_rightKeySelector).AsObservableCache(false);
+ var leftCache = _left.SynchronizeSafe(queue).AsObservableCache();
+ var rightCache = _right.SynchronizeSafe(queue).ChangeKey(_rightKeySelector).AsObservableCache();
// joined is the final cache
var joinedCache = new ChangeAwareCache();
diff --git a/src/DynamicData/Cache/Internal/GroupOn.cs b/src/DynamicData/Cache/Internal/GroupOn.cs
index 619685af..710da898 100644
--- a/src/DynamicData/Cache/Internal/GroupOn.cs
+++ b/src/DynamicData/Cache/Internal/GroupOn.cs
@@ -22,12 +22,12 @@ internal sealed class GroupOn(IObservable> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
+ var queue = new SharedDeliveryQueue();
var grouper = new Grouper(_groupSelectorKey);
- var groups = _source.Finally(observer.OnCompleted).Synchronize(locker).Select(grouper.Update).Where(changes => changes.Count != 0);
+ var groups = _source.Finally(observer.OnCompleted).SynchronizeSafe(queue).Select(grouper.Update).Where(changes => changes.Count != 0);
- var regroup = _regrouper.Synchronize(locker).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0);
+ var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0);
var published = groups.Merge(regroup).Publish();
var subscriber = published.SubscribeSafe(observer);
diff --git a/src/DynamicData/Cache/Internal/GroupOnDynamic.cs b/src/DynamicData/Cache/Internal/GroupOnDynamic.cs
index 815baa34..5bf285ea 100644
--- a/src/DynamicData/Cache/Internal/GroupOnDynamic.cs
+++ b/src/DynamicData/Cache/Internal/GroupOnDynamic.cs
@@ -17,13 +17,14 @@ internal sealed class GroupOnDynamic(IObservable> Run() => Observable.Create>(observer =>
{
var dynamicGrouper = new DynamicGrouper();
+ var queue = new SharedDeliveryQueue();
var notGrouped = new Cache();
var hasSelector = false;
// Create shared observables for the 3 inputs
- var sharedSource = source.Synchronize(dynamicGrouper).Publish();
- var sharedGroupSelector = selectGroupObservable.DistinctUntilChanged().Synchronize(dynamicGrouper).Publish();
- var sharedRegrouper = (regrouper ?? Observable.Empty()).Synchronize(dynamicGrouper).Publish();
+ var sharedSource = source.SynchronizeSafe(queue).Publish();
+ var sharedGroupSelector = selectGroupObservable.DistinctUntilChanged().SynchronizeSafe(queue).Publish();
+ var sharedRegrouper = (regrouper ?? Observable.Empty()).SynchronizeSafe(queue).Publish();
// The first value from the Group Selector should update the Grouper with all the values seen so far
// Then indicate a selector has been found. Subsequent values should just update the group selector.
diff --git a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs
index a7a14e3e..3896ca0b 100644
--- a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs
+++ b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs
@@ -21,12 +21,12 @@ internal sealed class GroupOnImmutable(IObservable> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
+ var queue = new SharedDeliveryQueue();
var grouper = new Grouper(_groupSelectorKey);
- var groups = _source.Synchronize(locker).Select(grouper.Update).Where(changes => changes.Count != 0);
+ var groups = _source.SynchronizeSafe(queue).Select(grouper.Update).Where(changes => changes.Count != 0);
- var regroup = _regrouper.Synchronize(locker).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0);
+ var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0);
return groups.Merge(regroup).SubscribeSafe(observer);
});
diff --git a/src/DynamicData/Cache/Internal/InnerJoin.cs b/src/DynamicData/Cache/Internal/InnerJoin.cs
index ae9858e7..f7ccf177 100644
--- a/src/DynamicData/Cache/Internal/InnerJoin.cs
+++ b/src/DynamicData/Cache/Internal/InnerJoin.cs
@@ -1,4 +1,4 @@
-// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
@@ -26,13 +26,14 @@ internal sealed class InnerJoin