Fix cross-cache deadlocks via queue-drain delivery pattern#1079
Open
dwcullop wants to merge 37 commits intoreactivemarbles:mainfrom
Open
Fix cross-cache deadlocks via queue-drain delivery pattern#1079dwcullop wants to merge 37 commits intoreactivemarbles:mainfrom
dwcullop wants to merge 37 commits intoreactivemarbles:mainfrom
Conversation
…nt cross-cache deadlock The original code held _locker while calling _changes.OnNext(), so subscriber callbacks that propagated to other caches created ABBA deadlocks when concurrent writes were happening on those caches. New design: - Single _locker protects mutation and queue state - Write paths: lock, mutate, enqueue changeset, release lock, then drain - DrainOutsideLock delivers notifications with no lock held - _isDraining flag ensures only one thread drains at a time, preserving Rx serialization contract - Re-entrant writes enqueue and return; the outer drain loop delivers them sequentially - Connect/Watch/CountChanged use Skip(pendingCount) to avoid duplicating items already in the snapshot, with no delivery under lock - Terminal events (OnCompleted/OnError) routed through drain queue - Preview remains synchronous under _locker (required by ReaderWriter) - Suspension state captured at enqueue time; re-checked at delivery - try/catch resets _isDraining on exception - volatile _isTerminated prevents post-dispose delivery
… contracts and thread-safety.
Introduce ReadOnlyScopedAccess to DeliveryQueue<TItem> for safe, read-only access to queue state under lock. Update tests and ObservableCache<TObject, TKey> to use AcquireReadLock() for reading PendingCount, replacing direct property access and manual locking. Make PendingCount private and encapsulate lock release logic. Wrap _suspensionTracker disposal in a lock for thread safety. These changes improve thread safety and clarify access patterns for queue state.
Refactored DirectCrossWriteDoesNotDeadlock to use Connect, Filter, Transform, and PopulateInto operators for bidirectional cache updates, replacing manual subscription logic. Increased test timeout and clarified assertion message. Prevented infinite feedback with key prefix filtering.
Refactored DeliveryQueue<TItem> to eliminate pending item tracking and PendingCount, removing related read-only lock APIs. ObservableCache<TObject, TKey> now ensures new subscribers do not receive in-flight notifications by connecting under the main lock, preventing duplicate deliveries without pending count logic. NotificationItem and delivery logic were simplified to check suspension state at delivery time. Updated tests: removed PendingCount tests and added a test to verify no duplicate notifications during delivery. Improved comments and code clarity.
Add conditional logic for .NET 9.0+ in SwappableLock to handle both _gate and _lockGate fields. SwapTo now checks both fields for initialization and releases the appropriate lock type, ensuring compatibility with new locking mechanisms while preserving legacy behavior.
Previously, haveExpirationsChanged was overwritten by each call to TrySetExpiration, potentially losing information about prior changes. Now, the |= operator is used to ensure haveExpirationsChanged remains true if any expiration update occurs, preserving the correct state across multiple updates.
Moved _isDelivering reset from finally to catch block in DeliveryQueue<TItem>. Now, the flag is only reset when an exception occurs, and the exception is rethrown, making the error handling more explicit and preventing unnecessary state changes during normal execution.
The MultiThreadedStressTest asserts immediately after stress observables complete, but with drain-outside-lock delivery, Edit() returns after enqueueing while delivery may still be in-flight on another thread. Add a short delay before checking results to allow in-flight deliveries to complete. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Publish and explicitly connect merged observable in test, and await completion of all notifications for robust result verification. - Move _suspensionTracker disposal outside lock in ObservableCache to prevent deadlocks and reentrancy issues. - Add System.Reactive.Threading.Tasks import for ToTask() usage.
…ynamicData into bugfix/lock_inversion
Fix race where new subscribers could see duplicate Add notifications if they connect while in-flight changes are being delivered. Introduce a versioning mechanism in ObservableCache to track committed and delivered notifications, and skip already-delivered changes for new subscribers. Extend NotificationItem with a version field and add read-only lock support in DeliveryQueue. Update test to reliably reproduce and verify the fix.
Add comprehensive tests for nested and concurrent suspend/resume scenarios in SuspendNotificationsFixture. Emit resume signals under lock in ObservableCache to prevent race conditions and ensure consistent notification delivery. These changes enhance reliability and determinism of notification delivery under complex and concurrent usage patterns.
- Strengthen test reliability and clarify test names/messages - Rewrite DeliveryQueueFixture test for robust concurrency checks - Enhance ObservableCache to avoid duplicate/applied notifications - Refactor ResumeNotifications to prevent race conditions - Improve comments and code clarity throughout
SharedDeliveryQueue: type-erased multi-T delivery queue for operators. - List<IDrainable> of DeliverySubQueue<T> instances - Per-queue IObserver<T> delivery via Notification<T> struct - Drain loop: one item per iteration, fair across sources - ScopedAccess-only API on DeliverySubQueue<T> - ReadOnlyScopedAccess on SharedDeliveryQueue - Error terminates queue AFTER delivery (matches Rx contract) - Completion is per-sub-queue (does not terminate parent queue) - CreateQueue locked to prevent race with drain loop SynchronizeSafe(SharedDeliveryQueue) extension in DynamicData.Internal namespace (NOT System.Reactive.Linq to avoid overload resolution issues). Existing DeliveryQueue<T> unchanged — still used by ObservableCache. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Surgical 2-line change per operator: 1. var queue = new SharedDeliveryQueue(locker); 2. .Synchronize(locker) -> .SynchronizeSafe(queue) Operators migrated (28 files): - Joins: FullJoin, InnerJoin, LeftJoin, RightJoin - Sort: Sort, SortAndPage, SortAndVirtualize, SortAndBind - Paging: Page, Virtualise - Groups: GroupOn, GroupOnImmutable, GroupOnDynamic - Combine: DynamicCombiner, MergeChangeSets, MergeMany - Transform: TransformWithForcedTransform, TransformAsync, TransformMany - Lifecycle: DisposeMany, AsyncDisposeMany, OnBeingRemoved - Other: BatchIf, Switch, AutoRefresh, TreeBuilder, QueryWhenChanged - ObservableCacheEx: Bind (2 overloads) + ToObservableOptional MergeChangeSets: removed #if NET9_0_OR_GREATER block (SharedDeliveryQueue is same type on both TFMs). Operators NOT migrated (kept on Synchronize - local gate, no cross-cache risk): - ExpireAfter.ForSource, ExpireAfter.ForStream (timer callbacks use lock()) - CacheParentSubscription (complex EnterUpdate/ExitUpdate batching) - EditDiffChangeSetOptional (no lock needed) - SpecifiedGrouper (caches have internal locking) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CrossCacheDeadlockStressTest: bidirectional pipeline using Sort, Page, AutoRefresh, Transform, Filter, SubscribeMany, MergeMany, QueryWhenChanged, SortAndBind, Virtualise across two SourceCaches. 4 writer threads per cache plus a property updater thread. Proves no deadlock under concurrent load. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CrossCacheDeadlockStressTest exercises every migrated operator in a
bidirectional multi-threaded pipeline:
Operators tested: Sort, Page, AutoRefresh, Transform, Filter,
SubscribeMany, MergeMany, MergeChangeSets, QueryWhenChanged,
SortAndBind, Virtualise, DisposeMany, GroupOn, GroupWithImmutableState,
FullJoin, InnerJoin, LeftJoin, TransformMany, BatchIf, Switch,
Or (DynamicCombiner)
Pipeline: cacheA -> Sort -> Page -> AutoRefresh -> Transform -> Filter
-> PopulateInto cacheB (forward)
cacheB -> Filter -> Transform -> PopulateInto cacheA (reverse)
+ cross-cache Join, MergeChangeSets, QueryWhenChanged, etc.
Load: 4 writer threads per cache (100 items each) + property updater
thread toggling BatchIf pause and Switch sources.
Verifies: item counts, sort order, virtualisation window, join results,
union correctness, batch delivery, group presence, transform counts.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ronize calls SpecifiedGrouper: Synchronize(locker) -> SynchronizeSafe(queue) Remaining Synchronize calls (6 total, all proven safe): - EditDiffChangeSetOptional: Synchronize() with no arg (Rx's own gate) - ExpireAfter.ForSource/ForStream: local gate shared with timer lock() callbacks that emit directly — timer emission path would need refactoring to enqueue through queue. Local gate, no cross-cache deadlock risk. - TransformMany:109: per-item inner lock (new lock per Transform item) - CacheParentSubscription: reentrant batching requires Synchronize. Local gate. All downstream operators use SynchronizeSafe — deadlock chain broken. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
SwappableLock NET9 support -> bugfix/swappable-lock-net9 ExpireAfter race fix -> bugfix/expire-after-race Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ication KitchenSink_AllOperatorsChained_NoDeadlock_CorrectResults: - 7 pipeline chains exercising every dangerous operator - Monster chain: AutoRefresh -> Filter -> Sort -> Page -> Transform -> IgnoreSameReferenceUpdate -> WhereReasonsAre -> OnItemAdded/Updated/Removed -> SubscribeMany -> NotEmpty -> SkipInitial - Join chain: FullJoin -> Group -> DisposeMany -> MergeMany -> Transform - Individual: InnerJoin, LeftJoin, RightJoin with ChangeKey - Combined: MergeChangeSets, Or (DynamicCombiner), BatchIf, QueryWhenChanged - Binding: SortAndBind, Virtualise, GroupWithImmutableState - Dynamic: Switch, TransformMany - Bidirectional: PopulateInto both directions with recursive filter guards Load: 8 writer threads per cache, 500 items each, property mutations (AutoRefresh), removals, sort/page/virtual parameter changes, BatchIf toggles, Switch source swaps. Bogus Randomizer with deterministic seed. Validates: exact counts, sort order, join semantics, union correctness, virtualisation window, group counts, transform multiplicity. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Three instruction files for GitHub Copilot and AI assistants: 1. .github/copilot-instructions.md — General overview - What DynamicData is and why it matters - Why performance and Rx compliance are critical - Repository structure - Operator architecture pattern (extension method -> internal class -> Run()) - SharedDeliveryQueue pattern explanation - Breaking change policy 2. .github/instructions/rx-contracts.instructions.md — Rx contract rules - Serialized notifications (the reactivemarbles#1 rule) - Terminal notification semantics - Subscription lifecycle and disposal - DynamicData-specific rules (lock ordering, changeset immutability) - Link to ReactiveX contract reference 3. .github/instructions/dynamicdata-operators.instructions.md — Operator guide - Complete operator catalog with descriptions and examples - Categories: Filtering, Transformation, Sorting, Paging, Grouping, Joining, Combining, Aggregation, Fan-out/Fan-in, Lifecycle, Refresh, Buffering, Binding, Utilities - How to write a new operator Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…rators Fix: MergeMany → MergeManyChangeSets for group sub-cache fan-out (MergeMany returns IObservable<T>, MergeManyChangeSets returns IObservable<IChangeSet<T,K>> — the latter is what we need here). Added operators (Pipeline 8-11): - And, Except, Xor (remaining set operations) - TransformOnObservable - FilterOnObservable - TransformWithInlineUpdate - DistinctValues - ToObservableChangeSet (bridges IObservable<T> into DD) - MergeMany (kept separately from MergeManyChangeSets) - Bind (ReadOnlyObservableCollection) - OnItemRefreshed, ForEachChange - DeferUntilLoaded All with exact final state assertions. Results: - Feature branch: PASSES in ~5s - main branch: DEADLOCKS at 30s timeout (proven) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Rewrite CrossCacheDeadlockStressTest for exact, verifiable results:
Writers now use explicit non-overlapping ID ranges instead of Faker:
- CacheA: IDs 1..4000 (8 threads × 500, family=(id%5))
- CacheB: IDs 10001..14000 (same pattern)
- No random removals during writes (was non-deterministic)
Post-write deterministic mutations:
- Toggle IncludeInResults=false for id%10==5 (400 items)
- Remove from A: id%20==0 (200 mammals removed)
- Remove from B: id%15==0 in range 10001..14000 (267 removed)
Bidirectional pipeline fixed:
- PopulateInto → ForEachChange+AddOrUpdate (respects target key selector)
- Forward: 600 surviving mammals → B with id+800_000
- Reverse: 600 fwd items → A with id+1_700_000
- Cycle-breaking: forward only accepts name.StartsWith('fwd-A'),
reverse only accepts name.StartsWith('fwd-A')
All 30+ assertions are now hardcoded exact values:
- CacheA: 4400 (3800 direct + 600 reverse)
- CacheB: 4333 (3733 direct + 600 forward)
- FullJoin: produces results (disjoint keys)
- InnerJoin: 0 (disjoint key ranges)
- LeftJoin: 4400, RightJoin: 4333
- MergeChangeSets/Or/Xor: 8733 (A+B, disjoint)
- And: 0, Except: 4400
- FilterOnObservable(Mammal): 1200 (600 direct + 600 reverse)
- TransformMany: 8800 (2× cacheA)
- Virtualise: 50 (window size)
- DistinctValues/Groups: 5 (all AnimalFamily values)
- SortAndBind: 4400, sorted ascending by Id
- Forward items in B: 600, Reverse items in A: 600
Deadlocks on main (30s timeout), passes in ~7s on this branch.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…iveryQueue DisposeMany, AsyncDisposeMany, and OnBeingRemoved are single-source operators — they only serialize one IObservable<IChangeSet<T,K>>. Using the type-erased SharedDeliveryQueue (with List<IDrainable>) was unnecessary overhead for these cases. Now they use DeliveryQueue<Notification<T>> directly, which: - Eliminates type-erasure overhead (no IDrainable interface, no List) - Delivery callback is set at construction, not via sub-queue creation - Same AcquireReadLock() for disposal synchronization Added SynchronizeSafe<T>(DeliveryQueue<Notification<T>>) overload that returns IDisposable (not IObservable<T>) — delivery happens through the queue's callback, not Rx composition. All 37 related tests pass (DisposeMany, AsyncDisposeMany, stress test). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Major infrastructure unification: DeliveryQueue<T> now: - Internally stores Queue<Notification<T>> (was Queue<TItem>) - Delivers to IObserver<T> (was Func<TItem, bool> callback) - ScopedAccess exposes Enqueue/EnqueueError/EnqueueCompleted (aligned with SharedDeliveryQueue's DeliverySubQueue API) - Terminal handling via Notification<T>.IsTerminal (was bool return) ObservableCache: - Deleted NotificationKind enum and NotificationItem record struct - Added CacheUpdate record struct (Changes?, Count, Version) - Added CacheUpdateObserver : IObserver<CacheUpdate> that dispatches to _changes, _changesPreview, _countChanged subjects - Terminal notifications go through queue's EnqueueCompleted/EnqueueError (no longer encoded in the payload type) Operators (DisposeMany, AsyncDisposeMany, OnBeingRemoved): - DeliveryQueue<IChangeSet<T,K>> directly (was DeliveryQueue<Notification<...>>) - No namespace-qualified Notification references needed SynchronizeSafeExtensions: - DeliveryQueue<T> overload (was DeliveryQueue<Notification<T>>) - Sets observer via SetObserver, uses Enqueue/EnqueueError/EnqueueCompleted DeliveryQueueFixture: - Rewritten for IObserver<T> pattern - Added ListObserver, ConcurrentObserver, DelegateObserver helpers - Terminal tests use EnqueueCompleted/EnqueueError 2233 tests passed, 0 failed (1 pre-existing flaky test). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…che() With SynchronizeSafe, the gate lock is released BEFORE downstream delivery. LockFreeObservableCache (used by AsObservableCache(false)) has no internal locking — it relied on the caller's Synchronize gate being held during the entire delivery chain. With SynchronizeSafe, a Connect() subscriber starting on another thread can overlap with delivery, causing 'Collection was modified' during enumeration of the internal ChangeAwareCache dictionary. Fix: Use locked AsObservableCache() (defaults to true) in all four join operators (InnerJoin, FullJoin, LeftJoin, RightJoin). This adds proper internal synchronization to the intermediate caches. Verified: 0/20 failures on InnerJoinFixtureRaceCondition (was ~10%). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
MergeMany has N child observables but they're all the same type TDestination. No need for type-erased SharedDeliveryQueue. Children enqueue directly via AcquireLock/Enqueue on the shared typed queue instead of going through SynchronizeSafe per-child. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
DeliveryQueue<T> now has public OnNext/OnError/OnCompleted methods
(implementing IObserver<T>) that acquire the lock, enqueue, and drain.
This enables natural Rx patterns:
child.Subscribe(queue) // queue as observer
child.Subscribe(queue.OnNext, …) // selective forwarding
MergeMany now reads naturally:
_observableSelector(t, key)
.Finally(() => counter.Finally())
.Subscribe(queue.OnNext, static _ => { });
SynchronizeSafe(DeliveryQueue<T>) simplified to:
queue.SetObserver(observer);
return source.Subscribe(queue);
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Add ForceTerminate() to DeliveryQueue/SharedDeliveryQueue for safe, immediate queue termination - Track delivery thread to avoid deadlocks during termination - Update SynchronizeSafe to create queues internally and expose queue via out parameter - Replace manual disposal in DisposeMany with KeyedDisposable and AddIfDisposable extension - Simplify OnBeingRemoved and ObservableCacheEx to use new queue pattern - Remove manual lock/disposal logic in favor of ForceTerminate - Improve comments, documentation, and add KeyedDisposableExtensions - Overall, improve safety, efficiency, and usability in multi-threaded scenarios
…liveryComplete Infrastructure: - DeliveryQueue<T>: IObserver<T>, Notification<T> internal, EnsureDeliveryComplete with _drainThreadId for re-entrant safety - SharedDeliveryQueue: EnsureDeliveryComplete with same pattern - Both: _isDelivering volatile (spin-wait visibility) - Both: _drainThreadId initialized to -1 (defensive) - SetObserver: double-call guard SynchronizeSafe overloads: - SynchronizeSafe(locker) — drop-in for Synchronize(locker), implicit DeliveryQueue - SynchronizeSafe(locker, out queue) — exposes queue for EnsureDeliveryComplete - SynchronizeSafe(SharedDeliveryQueue) — multi-source ObservableCache: - CacheUpdate struct + CacheUpdateObserver : IObserver<CacheUpdate> - Deleted NotificationItem/NotificationKind Operators: - DisposeMany: KeyedDisposable + EnsureDeliveryComplete - AsyncDisposeMany: same pattern - OnBeingRemoved: surgical SynchronizeSafe(locker, out queue) - MergeMany: DeliveryQueue<T> with queue.OnNext - Join operators: AsObservableCache(false) → AsObservableCache() - ObservableCacheEx: 2 single-source Adapt operators simplified KeyedDisposable: - Same-reference guard in Add - AddIfDisposable extension method Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Fixes for issues introduced by our changes: 1. [High] DeliveryQueue: set _isTerminated BEFORE Accept for terminal notifications, not after. Prevents race where concurrent code (e.g., InvokePreview) sees IsTerminated==false while terminal delivery is in-flight. (Found by: GPT-5.2 rx-expert-cache) 2. [Medium] KeyedDisposable pre-NET6 Remove: swap to remove-then-dispose matching NET6+ branch. Prevents double-dispose on re-entrant Dispose callbacks. (Found by: Claude Opus 4.6 bughunt-infra) 3. [Medium] KeyedDisposable.Dispose: per-item try/catch with AggregateException. Prevents leaking remaining disposables if one throws. (Found by: Claude Opus 4.6 bughunt-infra) 4. [Medium] Remove duplicate 'using DynamicData.Internal' in MergeChangeSets.cs and GroupOnDynamic.cs. (Found by: Claude Opus 4.5 concurrency-expert) Pre-existing issues noted but NOT fixed (existed before our changes): - Watch() error handling (GPT-5.2) - DynamicCombiner/TreeBuilder unsynchronized handlers (Opus 4.5) - Suspended Connect/Watch double OnCompleted (GPT-5.2) - Sub-queue per-instance terminal tracking (GPT-5.4) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
New test fixtures: - KeyedDisposableFixture (12 tests): Add, Remove, same-reference guard, AddIfDisposable for disposable/non-disposable items, Dispose aggregates exceptions, idempotent Dispose, Add-after-Dispose immediate disposal - SharedDeliveryQueueFixture (6 tests): single source delivery, multi-source serialization, error terminates all sub-queues, completion does NOT terminate parent, EnsureDeliveryComplete, concurrent multi-source - DeliveryQueueFixture additions (7 tests): EnsureDeliveryComplete terminates, clears pending items, re-entrant from drain thread (no deadlock), spin-waits for in-flight delivery, terminal items delivered before termination, error terminates and clears pending Total: 39 new infrastructure tests, all passing. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR replaces #1074 and provides a more robust fix
The Problem
Many DynamicData cache operators use
Synchronize(lock)to serialize concurrent access. This acquires a lock and holds it during the entire downstreamobserver.OnNext()delivery. When a pipeline crosses cache boundaries (one cache's output feeds into another cache), the downstream cache's lock can be acquired while the upstream operator's lock is still held. If two threads follow reverse paths through the same pair of caches, this creates a classic ABBA deadlock.This is not a theoretical issue because any application that composes complex DynamicData pipelines across multiple
SourceCacheinstances with concurrent writers can deadlock.The Solution: Queue-Drain Delivery
Replace
Synchronize(lock)with a queue-drain pattern that releases the lock before delivering to downstream subscribers. This eliminates ABBA deadlocks because no operator lock is held during cross-cache delivery.Two Delivery Queue Classes
DeliveryQueue<T>: typed, single-source queue. ImplementsIObserver<T>.Queue<Notification<T>>(value/error/completed)_isDelivering) ensures Rx serialization contractMergeMany(N children of the same type),DisposeMany/OnBeingRemoved(needEnsureDeliveryCompletefor disposal), andObservableCacheitself (the core fix)EnsureDeliveryComplete()with_drainThreadIdre-entrant safety for disposal callbacksSharedDeliveryQueue: type-erased, multi-source queue.DeliverySubQueue<T>viaCreateQueue<T>(observer)Drop-In Replacement:
SynchronizeSafeThree extension method overloads in
SynchronizeSafeExtensionsprovide surgical drop-in replacements forSynchronize(lock):Per-Operator Changes
Each operator change is surgical (typically 2 lines changed):
Multi-source operators (SharedDeliveryQueue)
Replace
var locker = InternalEx.NewLock()+source.Synchronize(locker)withvar queue = new SharedDeliveryQueue(locker)+source.SynchronizeSafe(queue):Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 3Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 3Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 3Synchronize(locker)→SynchronizeSafe(queue)× 3Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 2 (outer lock)Synchronize(locker)→SynchronizeSafe(queue)× 2Synchronize(locker)→SynchronizeSafe(queue)× 3Synchronize(locker)→SynchronizeSafe(queue)× 2Single-source operators (DeliveryQueue)
SynchronizeSafe(locker, out queue)+EnsureDeliveryCompletefor disposalSynchronizeSafe(locker, out queue)+EnsureDeliveryCompleteDeliveryQueue<TDestination>(locker, observer)+queue.OnNextObservableCache (the core fix)
CacheUpdaterecord struct +CacheUpdateObserver : IObserver<CacheUpdate>. DeliveryQueue drains notifications outside_locker.ObservableCacheEx (Adapt overloads)
Adaptoverloads:Synchronize(locker)→SynchronizeSafe(locker)Not Changed
CacheParentSubscription: uses
Synchronize(_synchronize)for reentrant batching (EnterUpdate/ExitUpdate pattern). Converting to queue-drain would require significant restructuring of the batching model. Used by MergeManyChangeSets, TransformOnObservable, GroupOnObservable, TransformManyAsync.Static Combiner (
Combiner<T,K>) used by the fixed-list Or/And/Except/Xor overloads. Uses a simplelock(_locker)pattern that doesn't hold the lock duringobserver.OnNext()(the callback is called outside the lock). No deadlock risk.Supporting Infrastructure
Internal/DeliveryQueue.csIObserver<T>Internal/SharedDeliveryQueue.csInternal/SynchronizeSafeExtensions.csInternal/Notification.csAccept(IObserver<T>)Internal/KeyedDisposable.csInternal/KeyedDisposableExtensions.csAddIfDisposable<TKey, TItem>extensionTests
Infrastructure Tests
DeliveryQueueFixture_drainThreadId, terminal notification ordering, concurrent accessSharedDeliveryQueueFixtureKeyedDisposableFixtureStress Test
CrossCacheDeadlockStressTestRandomizer(42). Custom domain types with INPC (AutoRefresh) and IDisposable (DisposeMany). Both MergeManyChangeSets overloads (child-comparer and source+child-comparer). Every operator appears 2+ times. Exact content verification withBeEquivalentTo. Would deadlock if any two co-located operators were reverted.Existing Test Fixes
InnerJoinFixtureAsObservableCache(false)→AsObservableCache(): LockFreeObservableCache races with SynchronizeSafe deliveryMergeManyChangeSetsCacheSourceCompareFixture