Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int para
}
while (adding);

// Allow any in-flight notification deliveries to complete before checking results.
// With the queue-based drain pattern, Edit() returns after enqueueing but delivery
// may still be in progress on another thread. Give the drain a moment to finish.
await Task.Delay(250).ConfigureAwait(false);

// Verify the results
CheckResultContents(_marketCacheResults, priceResults, Market.RatingCompare);
}
Expand Down
138 changes: 138 additions & 0 deletions src/DynamicData.Tests/Cache/SourceCacheFixture.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

using DynamicData.Tests.Domain;

Expand Down Expand Up @@ -188,4 +191,139 @@ public void StaticFilterRemove()

public record class SomeObject(int Id, int Value);


[Fact]
public async Task ConcurrentEditsShouldNotDeadlockWithSubscribersThatModifyOtherCaches()
{
const int itemCount = 100;

using var cacheA = new SourceCache<TestItem, string>(static x => x.Key);
using var cacheB = new SourceCache<TestItem, string>(static x => x.Key);
using var destination = new SourceCache<TestItem, string>(static x => x.Key);
using var subA = cacheA.Connect().PopulateInto(destination);
using var subB = cacheB.Connect().PopulateInto(destination);
using var results = destination.Connect().AsAggregator();

var taskA = Task.Run(() =>
{
for (var i = 0; i < itemCount; i++)
{
cacheA.AddOrUpdate(new TestItem($"a-{i}", $"ValueA-{i}"));
}
});

var taskB = Task.Run(() =>
{
for (var i = 0; i < itemCount; i++)
{
cacheB.AddOrUpdate(new TestItem($"b-{i}", $"ValueB-{i}"));
}
});

var completed = Task.WhenAll(taskA, taskB);
var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(10)));

finished.Should().BeSameAs(completed, "concurrent edits with cross-cache subscribers should not deadlock");
results.Error.Should().BeNull();
results.Data.Count.Should().Be(itemCount * 2, "all items from both caches should arrive in the destination");
results.Data.Items.Should().BeEquivalentTo([.. cacheA.Items, .. cacheB.Items], "all items should be in the destination");
}

[Fact]
public async Task DirectCrossWriteDoesNotDeadlock()
{
const int iterations = 100;

for (var iter = 0; iter < iterations; iter++)
{
using var cacheA = new SourceCache<TestItem, string>(static x => x.Key);
using var cacheB = new SourceCache<TestItem, string>(static x => x.Key);

// Bidirectional: A items flow into B, B items flow into A.
// Filter by prefix prevents infinite feedback.
using var aToB = cacheA.Connect()
.Filter(static x => x.Key.StartsWith('a'))
.Transform(static (item, _) => new TestItem("from-a-" + item.Key, item.Value))
.PopulateInto(cacheB);

using var bToA = cacheB.Connect()
.Filter(static x => x.Key.StartsWith('b'))
.Transform(static (item, _) => new TestItem("from-b-" + item.Key, item.Value))
.PopulateInto(cacheA);

var barrier = new Barrier(2);

var taskA = Task.Run(() =>
{
barrier.SignalAndWait();
for (var i = 0; i < 1000; i++)
{
cacheA.AddOrUpdate(new TestItem("a" + i, "V" + i));
}
});

var taskB = Task.Run(() =>
{
barrier.SignalAndWait();
for (var i = 0; i < 1000; i++)
{
cacheB.AddOrUpdate(new TestItem("b" + i, "V" + i));
}
});

var completed = Task.WhenAll(taskA, taskB);
var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(30)));

finished.Should().BeSameAs(completed, $"iteration {iter}: bidirectional cross-cache writes should not deadlock");
}
}

[Fact]
public void ConnectDuringDeliveryDoesNotDuplicate()
{
// Proves the dequeue-to-delivery gap. A slow subscriber holds delivery
// while another thread connects. The new subscriber's snapshot is taken
// under the lock, so it will not see a duplicate Add.
using var cache = new SourceCache<TestItem, string>(static x => x.Key);

var delivering = new ManualResetEventSlim(false);
var connectDone = new ManualResetEventSlim(false);

// First subscriber: signals when delivery starts, then waits
using var slowSub = cache.Connect().Subscribe(_ =>
{
delivering.Set();
connectDone.Wait(TimeSpan.FromSeconds(5));
});

// Write one item -- delivery starts, slow subscriber blocks
var writeTask = Task.Run(() => cache.AddOrUpdate(new TestItem("k1", "v1")));

// Wait for delivery to be in progress
delivering.Wait(TimeSpan.FromSeconds(5));

// Now Connect on the main thread while delivery is in progress.
// The item is already committed to ReaderWriter and dequeued from
// the delivery queue, but OnNext hasn't finished iterating observers.
var duplicateKeys = new List<string>();
using var newSub = cache.Connect().Subscribe(changes =>
{
foreach (var c in changes)
{
if (c.Reason == ChangeReason.Add)
{
duplicateKeys.Add(c.Current.Key);
}
}
});

// Let delivery finish
connectDone.Set();
writeTask.Wait(TimeSpan.FromSeconds(5));

// Check: k1 should appear exactly once (either in snapshot or stream, not both)
var k1Count = duplicateKeys.Count(k => k == "k1");
k1Count.Should().Be(1, "k1 should appear exactly once via Connect, not duplicated from snapshot + in-flight delivery");
}
private sealed record TestItem(string Key, string Value);
}
Loading
Loading