using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Subjects; using System.Data.Entity.Infrastructure; using System.Collections.Concurrent; using System.Data.Objects; using Disco.Models.Repository; namespace Disco.Data.Repository.Monitor { public static class RepositoryMonitor { private static Subject streamBefore = new Subject(); private static Subject streamAfter = new Subject(); private static ConcurrentDictionary entityProxyTypeCache = new ConcurrentDictionary(); public static Subject StreamBeforeCommit { get { return streamBefore; } } public static Subject StreamAfterCommit { get { return streamAfter; } } internal static RepositoryMonitorEvent[] BeforeSaveChanges(DiscoDataContext Database) { var contextStateManager = ((IObjectContextAdapter)Database).ObjectContext.ObjectStateManager; Database.ChangeTracker.DetectChanges(); var changes = Database.ChangeTracker.Entries().Where(entry => entry.State == System.Data.EntityState.Added || entry.State == System.Data.EntityState.Deleted || entry.State == System.Data.EntityState.Modified); var events = changes.Select(entryState => { ObjectStateEntry stateEntry = contextStateManager.GetObjectStateEntry(entryState.Entity); var monitorEvent = EventFromEntryState(Database, entryState, stateEntry); // Push to Stream streamBefore.OnNext(monitorEvent); return monitorEvent; }).ToArray(); return events; } internal static void AfterSaveChanges(DiscoDataContext Database, IEnumerable changes) { foreach (var change in changes) { UpdateAfterEventFromEntryState(change); streamAfter.OnNext(change); } } private static Type EntityTypeFromProxy(Type EntityProxyType) { Type EntityType; if (entityProxyTypeCache.TryGetValue(EntityProxyType, out EntityType)) return EntityType; EntityType = EntityProxyType; do { if (EntityType.Namespace.StartsWith("Disco.Models.Repository")) { entityProxyTypeCache.TryAdd(EntityProxyType, EntityType); return EntityType; } EntityType = EntityType.BaseType; } while (EntityType != null); throw new ArgumentException("The EntryProxyType does not inherit from any Repository Models", "EntityProxyType"); } internal static void UpdateAfterEventFromEntryState(RepositoryMonitorEvent monitorEvent) { monitorEvent.afterCommit = true; if (monitorEvent.EventType == RepositoryMonitorEventType.Added) { // Update Entity Key for Added Events monitorEvent.EntityKey = DetermineEntityKey(monitorEvent.objectEntryState); } // Execute Deferred Actions if (monitorEvent.executeAfterCommit != null) foreach (var deferredAction in monitorEvent.executeAfterCommit) deferredAction.Invoke(monitorEvent); } internal static RepositoryMonitorEvent EventFromEntryState(DiscoDataContext Database, DbEntityEntry entityEntry, ObjectStateEntry entryState) { RepositoryMonitorEventType eventType; string[] modifiedProperties = null; Dictionary entityKey = null; Type entityType; switch (entryState.State) { case System.Data.EntityState.Added: eventType = RepositoryMonitorEventType.Added; break; case System.Data.EntityState.Deleted: eventType = RepositoryMonitorEventType.Deleted; break; case System.Data.EntityState.Detached: eventType = RepositoryMonitorEventType.Detached; break; case System.Data.EntityState.Modified: eventType = RepositoryMonitorEventType.Modified; break; case System.Data.EntityState.Unchanged: eventType = RepositoryMonitorEventType.Unchanged; break; default: throw new NotSupportedException($"Database Entry State not supported: {entryState.State.ToString()}"); } entityType = EntityTypeFromProxy(entryState.Entity.GetType()); // Only pass modified properties on Modified Event if (eventType == RepositoryMonitorEventType.Modified) modifiedProperties = entryState.GetModifiedProperties().ToArray(); else modifiedProperties = new string[] { }; // Empty array for Added/Deleted. // Don't pass entity key when entity newly added if (eventType != RepositoryMonitorEventType.Added) entityKey = DetermineEntityKey(entryState); return new RepositoryMonitorEvent() { EventType = eventType, Entity = entryState.Entity, EntityKey = entityKey, EntityType = entityType, ModifiedProperties = modifiedProperties, Database = Database, dbEntityState = entityEntry, objectEntryState = entryState }; } internal static Dictionary DetermineEntityKey(ObjectStateEntry entryState) { Dictionary key = entryState.EntityKey.EntityKeyValues.ToDictionary(kv => kv.Key, kv => kv.Value); if (entryState.Entity is DeviceAttachment) { key["DeviceSerialNumber"] = ((DeviceAttachment)entryState.Entity).DeviceSerialNumber; } if (entryState.Entity is DeviceCertificate) { key["DeviceSerialNumber"] = ((DeviceCertificate)entryState.Entity).DeviceSerialNumber; } if (entryState.Entity is DeviceComponent) { key["DeviceModelId"] = ((DeviceComponent)entryState.Entity).DeviceModelId; } if (entryState.Entity is DeviceUserAssignment) { key["AssignedUserId"] = ((DeviceUserAssignment)entryState.Entity).AssignedUserId; } if (entryState.Entity is JobAttachment) { key["JobId"] = ((JobAttachment)entryState.Entity).JobId; } if (entryState.Entity is JobComponent) { key["JobId"] = ((JobComponent)entryState.Entity).JobId; } if (entryState.Entity is JobLog) { key["JobId"] = ((JobLog)entryState.Entity).JobId; } if (entryState.Entity is UserAttachment) { key["UserId"] = ((UserAttachment)entryState.Entity).UserId; } if (entryState.Entity is UserFlagAssignment) { key["UserFlagId"] = ((UserFlagAssignment)entryState.Entity).UserFlagId; key["UserId"] = ((UserFlagAssignment)entryState.Entity).UserId; } return key; } } }