En Rx, ¿cómo agrupar eventos por ID y controlar cada grupo por múltiples TimeSpans?

Me metí en una ola de Rx, por así decirlo, y esta pregunta está relacionada con la mía aquí y aquí . Sin embargo, tal vez sean de ayuda para alguien, ya que podría verlos como variaciones útiles del mismo tema.

Pregunta: ¿Cómo podría uno agrupar un flujo aleatorio de objetos int (por ejemplo, en el intervalo [0, 10] producido en un intervalo aleatorio) en grupos y proporcionar al grupo de búsqueda un número variable de alarmas de ausencia de eventos (por la falta de una mejor definición, para mayor información ver publicaciones vinculadas). Más específicamente con el código, ¿cómo podría uno definir la configuración del acelerador de multipe por grupo en lo siguiente:

 var idAlarmStream = idStream .Select(i => i) .GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000)) .SelectMany(grp => grp.TakeLast(1)) .Subscribe(i => Console.WriteLine(i)); 

Aquí se llamará a la función de suscripción si hay más de un segundo de ausencia de ID por grupo. ¿Qué sucede si uno quisiera definir tres valores diferentes para la ausencia de eventos (por ejemplo, un segundo, cinco segundos y diez segundos) y todos se cancelan cuando llega un evento? Lo que se me ocurre son:

  • Divida cada ID en idStream en varios sintéticos y proporcione un mapeo biyectivo entre los ID reales y los sintéticos. Por ejemplo, en este caso ID: 1 -> 100, 101, 102; ID: 2 -> 200, 201, 203 y luego defina una función selectora en Throttle como en Func(i => /* switch(i)...*/) y luego cuando se llame a Subscribe , mapear la ID de nuevo. Consulte también las preguntas vinculadas para obtener más información.
  • Cree una agrupación anidada en la que se agrupen los ID y luego los grupos de ID se copiarán / replicarán / bifurcarán (no conozco el término adecuado) en grupos de acuerdo con los valores de regulación. Creo que este enfoque es bastante complicado y no estoy seguro de si sería el mejor. Sin embargo, estoy seguro de que estaría interesado en ver tal consulta.

En un contexto más general, sospecho, esta es una situación en la que hay varios manejadores por grupo, aunque no he logrado encontrar nada relacionado con esto.

<edit: Como (con suerte, aclarando) un ejemplo idStream empuja una ID: 1 sobre la cual se iniciarían tres contadores diferentes, cada uno de los cuales espera que ocurra el siguiente evento o es alarmante si no se detecta una nueva ID a tiempo. El contador 1 (C1) espera cinco segundos, el contador 2 (C2) durante siete segundos y el contador 3 (C3) durante diez segundos. Si se recibe una nueva ID 1 dentro del intervalo [0, 5] segundos, todos los contadores se reiniciarán con los valores mencionados anteriormente y no se enviará ninguna alarma. Si se recibe una nueva ID dentro del intervalo [0, 7) segundos, las alarmas C1 y C2 y C3 se reiniciarán. De manera similar, si se recibe una nueva ID dentro de un intervalo [0, 10) segundos, C1 y C2 se disparan, pero C3 se reinicializa.

Es decir, habría múltiples “alarmas de ausencia” o, en general, acciones tomadas, hacia una ID en ciertas condiciones. No estoy seguro de lo que sería un buen análogo … Quizás astackr “luces de alerta” en una torre para que primero sea verde, luego amarilla y por último roja. A medida que la ausencia de una identificación continúa más y más, un color tras otro se iluminará (en este caso, el rojo es el último). Luego, cuando se detecta una identificación, todas las luces se apagarán.

<edit 2: Al adaptar el código de James al ejemplo siguiente y dejar el rest como está escrito, descubrí que el Subscribe se llamará directamente al primer evento en los dos niveles de alarma.

 const int MaxLevels = 2; var idAlarmStream = idStream .Select(i => i) .AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default) .Subscribe(i => { Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)); }); 

Veamos qué está pasando aquí y si MaxLevels podría proporcionarse dinámicamente …

<edit 3: el código de James funciona. ¡El problema estaba entre la silla y el teclado! Cambiar el tiempo a algo más sensato seguro que ayudó. De hecho, los cambié a figuras más grandes, pero era .FromTicks y se me escapó por unos minutos.

Creo que esto funciona. Intentaré añadir una explicación más completa más adelante. Cada nivel de alarma tiene un umbral definido (por grupo de señal). Se espera que estos sean de mayor duración.

La idea básica es que las señales de todos los niveles anteriores se introduzcan en el nivel actual. El primer nivel es un nivel “cero” de las señales que se filtran antes de que se devuelva el flujo de alarma. Tenga en cuenta que las claves TSignal deben admitir la identidad de valor.

Estoy seguro de que hay espacio para la simplificación!

Prueba unitaria de muestra:

 public class AlarmTests : ReactiveTest { [Test] public void MultipleKeyMultipleSignalMultipleLevelTest() { var threshold1 = TimeSpan.FromTicks(300); var threshold2 = TimeSpan.FromTicks(800); var scheduler = new TestScheduler(); var signals = scheduler.CreateHotObservable( OnNext(200, 1), OnNext(200, 2), OnNext(400, 1), OnNext(420, 2), OnNext(800, 1), OnNext(1000, 1), OnNext(1200, 1)); Func keySelector = i => i; Func thresholdSelector = (key, level) => { if (level == 1) return threshold1; if (level == 2) return threshold2; return TimeSpan.MaxValue; }; var results = scheduler.CreateObserver>(); signals.AlarmSystem( keySelector, thresholdSelector, 2, scheduler).Subscribe(results); scheduler.Start(); results.Messages.AssertEqual( OnNext(700, new Alarm(1, 1)), OnNext(720, new Alarm(2, 1)), OnNext(1220, new Alarm(2, 2)), OnNext(1500, new Alarm(1, 1)), OnNext(2000, new Alarm(1, 2))); } [Test] public void CheckAlarmIsSuppressed() { var threshold1 = TimeSpan.FromTicks(300); var threshold2 = TimeSpan.FromTicks(500); var scheduler = new TestScheduler(); var signals = scheduler.CreateHotObservable( OnNext(200, 1), OnNext(400, 1), OnNext(600, 1)); Func keySelector = i => i; Func thresholdSelector = (signal, level) => { if (level == 1) return threshold1; if (level == 2) return threshold2; return TimeSpan.MaxValue; }; var results = scheduler.CreateObserver>(); signals.AlarmSystem( keySelector, thresholdSelector, 2, scheduler).Subscribe(results); scheduler.Start(); results.Messages.AssertEqual( OnNext(900, new Alarm(1, 1)), OnNext(1100, new Alarm(1, 2))); } } public static class ObservableExtensions { ///  /// Create an alarm system that detects signal gaps of length /// determined by a signal key and signals alarms of increasing severity. ///  /// Type of the signal /// Type of the signal key used for grouping, must implement Equals correctly /// Input signal stream /// Function to select a key from a signal for grouping /// Function to select a threshold for a given signal key and alarm level. /// Should return TimeSpan.MaxValue for levels above the highest level /// Number of alarm levels /// Scheduler use for throttling /// A stream of alarms each of which contains the signal and alarm level public static IObservable> AlarmSystem( this IObservable signals, Func keySelector, Func thresholdSelector, int levels, IScheduler scheduler) { var alarmSignals = signals.Select(signal => new Alarm(signal, 0)) .Publish() .RefCount(); for (int i = 0; i < levels; i++) { alarmSignals = alarmSignals.CreateAlarmSystemLevel( keySelector, thresholdSelector, i + 1, scheduler); } return alarmSignals.Where(alarm => alarm.Level != 0); } private static IObservable> CreateAlarmSystemLevel( this IObservable> alarmSignals, Func keySelector, Func thresholdSelector, int level, IScheduler scheduler) { return alarmSignals .Where(alarmSignal => alarmSignal.Level == 0) .Select(alarmSignal => alarmSignal.Signal) .GroupByUntil( keySelector, grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler)) .SelectMany(grp => grp.TakeLast(1).Select(signal => new Alarm(signal, level))) .Merge(alarmSignals); } } public class Alarm : IEquatable> { public Alarm(TSignal signal, int level) { Signal = signal; Level = level; } public TSignal Signal { get; private set; } public int Level { get; private set; } private static bool Equals(Alarm x, Alarm y) { if (ReferenceEquals(x, null)) return false; if (ReferenceEquals(y, null)) return false; if (ReferenceEquals(x, y)) return true; return x.Signal.Equals(y.Signal) && x.Level.Equals(y.Level); } // Equality implementation added to help with testing. public override bool Equals(object other) { return Equals(this, other as Alarm); } public override string ToString() { return string.Format("Signal: {0} Level: {1}", Signal, Level); } public bool Equals(Alarm other) { return Equals(this, other); } public static bool operator ==(Alarm x, Alarm y) { return Equals(x, y); } public static bool operator !=(Alarm x, Alarm y) { return !Equals(x, y); } public override int GetHashCode() { return ((Signal.GetHashCode()*37) ^ Level.GetHashCode()*329); } }