Files
Disco/Disco.Services/Tasks/ScheduledTaskNotificationsHub.cs
T
Gary Sharp 4cd57f4a90 Update: SignalR 2.0.3 Migration; Noticeboards
Migrate all SignalR 1.x Persistent Connections to SignalR 2.x Hubs.
Abstracts ScheduledTaskStatus with core interface and adds a Mock for
optional status reporting. Noticeboards rewritten (with new theme) to be
more resilient and accurate.
2014-06-01 23:27:07 +10:00

90 lines
3.3 KiB
C#

using Disco.Services.Web.Signalling;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Linq;
namespace Disco.Services.Tasks
{
using System.Reactive.Subjects;
using ChangedItem = KeyValuePair<string, object>;
[HubName("scheduledTaskNotifications"), DiscoHubAuthorize()]
public class ScheduledTaskNotificationsHub : Hub
{
private const string NotificationsPrefix = "Logging_";
private static Subject<Tuple<string, IEnumerable<ChangedItem>>> taskUpdatesStream = new Subject<Tuple<string, IEnumerable<ChangedItem>>>();
private static IDisposable taskUpdatesStreamSubscription;
internal static void Initialize()
{
if (taskUpdatesStreamSubscription == null)
{
lock (taskUpdatesStream)
{
if (taskUpdatesStreamSubscription == null)
{
taskUpdatesStreamSubscription = taskUpdatesStream
.DelayBuffer(TimeSpan.FromMilliseconds(200))
.Subscribe(BroadcastBufferedEvents);
}
}
}
}
internal static void PublishEvent(string TaskSessionId, IEnumerable<ChangedItem> ChangedItems)
{
taskUpdatesStream.OnNext(Tuple.Create(TaskSessionId, ChangedItems));
}
public override System.Threading.Tasks.Task OnConnected()
{
var taskSessionId = Context.QueryString["TaskSessionId"];
if (string.IsNullOrEmpty(taskSessionId))
throw new ArgumentNullException("TaskSessionId");
var status = ScheduledTasks.GetTaskStatus(taskSessionId);
if (status == null)
throw new ArgumentException("Invalid ScheduledTask SessionId", "TaskSessionId");
// Send Status:
var currentStatus = ScheduledTaskStatusLive.FromScheduledTaskStatus(status, null);
Clients.Caller.initializeTaskStatus(currentStatus);
// Add to Group
var groupName = NotificationsPrefix + taskSessionId;
Groups.Add(Context.ConnectionId, groupName);
return base.OnConnected();
}
private static void BroadcastBufferedEvents(IEnumerable<Tuple<string, IEnumerable<ChangedItem>>> Events)
{
var connectionManager = GlobalHost.ConnectionManager;
var context = connectionManager.GetHubContext<ScheduledTaskNotificationsHub>();
var taskStatusEvents = Events.GroupBy(e => e.Item1).Select(taskEventsGroup =>
{
Dictionary<string, object> changes = new Dictionary<string, object>();
foreach (var changeEvents in taskEventsGroup.Select(taskEvents => taskEvents.Item2))
foreach (var changeEvent in changeEvents)
changes[changeEvent.Key] = changeEvent.Value;
return Tuple.Create(taskEventsGroup.Key, changes);
});
foreach (var taskStatusEvent in taskStatusEvents)
{
var groupName = NotificationsPrefix + taskStatusEvent.Item1;
context.Clients.Group(groupName).updateTaskStatus(taskStatusEvent.Item2);
}
}
}
}