Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
d2a8dc4
Refactor SwappableLock to add NET9+ Lock overloads
dwcullop Apr 6, 2026
c4b89af
Fix race in ExpireAfter when item is removed or updated before expira…
dwcullop Apr 6, 2026
501c9f2
fix: Replace lock-during-notification with queue-based drain to preve…
dwcullop Apr 6, 2026
72ea32c
Refactor to use one lock and a serialized delivery queue to ensure Rx…
dwcullop Apr 7, 2026
ab41353
Add read-only lock for DeliveryQueue and improve safety
dwcullop Apr 7, 2026
c459365
Refactor cross-cache deadlock test to use operators
dwcullop Apr 7, 2026
8eb5ffb
Simplify delivery queue; remove pending count logic
dwcullop Apr 7, 2026
97b721a
Support .NET 9+ locking in SwappableLock's SwapTo method
dwcullop Apr 7, 2026
7625ac2
Use |= to accumulate expiration changes correctly
dwcullop Apr 7, 2026
5e4ad0e
Refactor DeliveryQueue exception handling logic
dwcullop Apr 7, 2026
a92b596
Fix MergeMany stress test timing for queue-based delivery
dwcullop Apr 7, 2026
92f1afe
Update src/DynamicData.Tests/Cache/SourceCacheFixture.cs
dwcullop Apr 8, 2026
7544f8c
Improve test reliability and ObservableCache disposal safety
dwcullop Apr 8, 2026
1665536
Merge branch 'bugfix/lock_inversion' of https://github.com/dwcullop/D…
dwcullop Apr 8, 2026
836d8f3
Prevent duplicate notifications on Connect during delivery
dwcullop Apr 8, 2026
b5c7862
Improve suspend/resume notification handling and tests
dwcullop Apr 8, 2026
b8e03b6
Improve thread safety, tests, and notification delivery
dwcullop Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Reactive.Disposables;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
Expand Down Expand Up @@ -90,9 +91,11 @@ IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int para
.Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler))
.Finally(market.PricesCache.Dispose);

var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true);
var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true).Publish();
var adding = true;
var cacheCompleted = merged.LastOrDefaultAsync().ToTask();
using var priceResults = merged.AsAggregator();
using var connect = merged.Connect();

// Start asynchrononously modifying the parent list and the child lists
using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
Expand All @@ -118,6 +121,9 @@ IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int para
}
while (adding);

// Wait for the source cache to finish delivering all notifications.
await cacheCompleted;

// Verify the results
CheckResultContents(_marketCacheResults, priceResults, Market.RatingCompare);
}
Expand Down
167 changes: 167 additions & 0 deletions src/DynamicData.Tests/Cache/SourceCacheFixture.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
ο»Ώusing System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

using DynamicData.Tests.Domain;

Expand Down Expand Up @@ -188,4 +191,168 @@ public void StaticFilterRemove()

public record class SomeObject(int Id, int Value);


[Fact]
public async Task ConcurrentEditsShouldNotDeadlockWithSubscribersThatModifyOtherCaches()
{
const int itemCount = 100;

using var cacheA = new SourceCache<TestItem, string>(static x => x.Key);
using var cacheB = new SourceCache<TestItem, string>(static x => x.Key);
using var destination = new SourceCache<TestItem, string>(static x => x.Key);
using var subA = cacheA.Connect().PopulateInto(destination);
using var subB = cacheB.Connect().PopulateInto(destination);
using var results = destination.Connect().AsAggregator();

var taskA = Task.Run(() =>
{
for (var i = 0; i < itemCount; i++)
{
cacheA.AddOrUpdate(new TestItem($"a-{i}", $"ValueA-{i}"));
}
});

var taskB = Task.Run(() =>
{
for (var i = 0; i < itemCount; i++)
{
cacheB.AddOrUpdate(new TestItem($"b-{i}", $"ValueB-{i}"));
}
});

var completed = Task.WhenAll(taskA, taskB);
var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(10)));

finished.Should().BeSameAs(completed, "concurrent edits with cross-cache subscribers should not deadlock");
results.Error.Should().BeNull();
results.Data.Count.Should().Be(itemCount * 2, "all items from both caches should arrive in the destination");
results.Data.Items.Should().BeEquivalentTo([.. cacheA.Items, .. cacheB.Items], "all items should be in the destination");
}

[Fact]
public async Task DirectCrossWriteDoesNotDeadlock()
{
const int iterations = 100;

for (var iter = 0; iter < iterations; iter++)
{
using var cacheA = new SourceCache<TestItem, string>(static x => x.Key);
using var cacheB = new SourceCache<TestItem, string>(static x => x.Key);

// Bidirectional: A items flow into B, B items flow into A.
// Filter by prefix prevents infinite feedback.
using var aToB = cacheA.Connect()
.Filter(static x => x.Key.StartsWith('a'))
.Transform(static (item, _) => new TestItem("from-a-" + item.Key, item.Value))
.PopulateInto(cacheB);

using var bToA = cacheB.Connect()
.Filter(static x => x.Key.StartsWith('b'))
.Transform(static (item, _) => new TestItem("from-b-" + item.Key, item.Value))
.PopulateInto(cacheA);

using var barrier = new Barrier(2);

var taskA = Task.Run(() =>
{
barrier.SignalAndWait();
for (var i = 0; i < 1000; i++)
{
cacheA.AddOrUpdate(new TestItem("a" + i, "V" + i));
}
});

var taskB = Task.Run(() =>
{
barrier.SignalAndWait();
for (var i = 0; i < 1000; i++)
{
cacheB.AddOrUpdate(new TestItem("b" + i, "V" + i));
}
});

var completed = Task.WhenAll(taskA, taskB);
var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(30)));

finished.Should().BeSameAs(completed, $"iteration {iter}: bidirectional cross-cache writes should not deadlock");
}
}

[Fact]
public void ConnectDuringDeliveryDoesNotDuplicate()
{
// Exploits the dequeue-to-OnNext window. Thread A writes two items in
// separate batches. The first delivery is held by a slow subscriber.
// While item1 delivery is blocked, item2 is committed to ReaderWriter
// and sitting in the queue. Thread B calls Connect(), takes a snapshot
// (sees both items), subscribes to _changes, then item2 is delivered
// via OnNext β€” producing a duplicate if not guarded by a generation counter.
using var cache = new SourceCache<TestItem, string>(static x => x.Key);

using var delivering = new ManualResetEventSlim(false);
using var item2Written = new ManualResetEventSlim(false);
using var connectDone = new ManualResetEventSlim(false);

var firstDelivery = true;

// First subscriber: blocks on the first delivery to create the window
using var slowSub = cache.Connect().Subscribe(_ =>
{
if (firstDelivery)
{
firstDelivery = false;
delivering.Set();

// Wait until item2 has been written and the Connect has subscribed
connectDone.Wait(TimeSpan.FromSeconds(5));
}
});

// Write item1 on a background thread β€” delivery starts, slow subscriber blocks
var writeTask = Task.Run(() =>
{
cache.AddOrUpdate(new TestItem("k1", "v1"));
});

// Wait for delivery of item1 to be in progress (slow sub is blocking)
delivering.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("delivery should have started");

// Now write item2 on another thread. It will acquire the lock, commit to
// ReaderWriter, enqueue a notification, and return. The notification sits
// in the queue because the deliverer (Thread A) is blocked by the slow sub.
var writeTask2 = Task.Run(() =>
{
cache.AddOrUpdate(new TestItem("k2", "v2"));
item2Written.Set();
});
item2Written.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("item2 should have been written");

// Now Connect on the main thread. The snapshot from ReaderWriter includes
// BOTH k1 and k2. The subscription to _changes is added. When the slow
// subscriber unblocks, item2's notification will be delivered via OnNext
// and the new subscriber will see k2 again β€” a duplicate Add.
var addCounts = new Dictionary<string, int>();
using var newSub = cache.Connect().Subscribe(changes =>
{
foreach (var c in changes)
{
if (c.Reason == ChangeReason.Add)
{
var key = c.Current.Key;
addCounts[key] = addCounts.GetValueOrDefault(key) + 1;
}
}
});

// Unblock the slow subscriber β€” delivery resumes, item2 delivered
connectDone.Set();
writeTask.Wait(TimeSpan.FromSeconds(5));
writeTask2.Wait(TimeSpan.FromSeconds(5));

// Each key should appear exactly once in the new subscriber's view
addCounts.GetValueOrDefault("k1").Should().Be(1, "k1 should appear once (snapshot only)");
addCounts.GetValueOrDefault("k2").Should().Be(1, "k2 should appear once, not duplicated from snapshot + queued delivery");
}

private sealed record TestItem(string Key, string Value);
}
Loading
Loading