Operaciones asíncronas de caching.

Estoy buscando una forma elegante de almacenar en caché los resultados de mis operaciones asíncronas.

Primero tuve un método sincrónico como este:

public String GetStuff(String url) { WebRequest request = WebRequest.Create(url); using (var response = request.GetResponse()) using (var sr = new StreamReader(response.GetResponseStream())) return sr.ReadToEnd(); } 

Entonces lo hice asíncrono:

 public async Task GetStuffAsync(String url) { WebRequest request = WebRequest.Create(url); using (var response = await request.GetResponseAsync()) using (var sr = new StreamReader(response.GetResponseStream())) return await sr.ReadToEndAsync(); } 

Entonces decidí que debería almacenar en caché los resultados, por lo que no necesito consultar fuera de eso a menudo:

 ConcurrentDictionary _cache = new ConcurrentDictionary(); public async Task GetStuffAsync(String url) { return _cache.GetOrAdd(url, await GetStuffInternalAsync(url)); } private async Task GetStuffInternalAsync(String url) { WebRequest request = WebRequest.Create(url); using (var response = await request.GetResponseAsync()) using (var sr = new StreamReader(response.GetResponseStream())) return await sr.ReadToEndAsync(); } 

Luego leí un artículo (o vi un video) sobre cómo es mejor el almacenamiento en caché de la Task , porque crearlos es costoso:

 ConcurrentDictionary<String, Task> _cache = new ConcurrentDictionary<String, Task>(); public Task GetStuffAsync(String url) { return _cache.GetOrAdd(url, GetStuffInternalAsync(url)); } private async Task GetStuffInternalAsync(String url) { WebRequest request = WebRequest.Create(url); using (var response = await request.GetResponseAsync()) using (var sr = new StreamReader(response.GetResponseStream())) return await sr.ReadToEndAsync(); } 

Y ahora el problema es que si la solicitud falla (por ejemplo, un HTTP 401), la memoria caché contendrá una Task fallida y tendré que restablecer la aplicación porque será imposible reenviar la solicitud.

¿Existe una manera elegante de usar ConcurrentDictionary para almacenar en caché solo las tareas exitosas y aún tener el comportamiento atómico?

En primer lugar, ambos enfoques son incorrectos, porque no le guardan ninguna solicitud (aunque el segundo le ahorra tiempo al menos).

Tu primer código (el que está a la await ) hace esto:

  1. Haga la solicitud.
  2. Espere a que se complete la solicitud.
  3. Si ya hubo un resultado en el caché, ignore el resultado de la solicitud.

Su segundo código elimina el paso 2, por lo que es más rápido, pero aún está haciendo muchas solicitudes innecesarias.

Lo que debe hacer en su lugar es usar la sobrecarga de GetOrAdd() que toma un delegado :

 public Task GetStuffAsync(String url) { return _cache.GetOrAdd(url, GetStuffInternalAsync); } 

Esto no elimina completamente la posibilidad de que las solicitudes se ignoren, pero las hace mucho menos probables. (Para eso, podrías intentar cancelar solicitudes que sabes que están siendo ignoradas, pero no creo que valga la pena el esfuerzo aquí).


Ahora a su pregunta real. Lo que creo que deberías hacer es usar el método AddOrUpdate() . Si el valor aún no está allí, agréguelo. Si está allí, reemplácelo si está defectuoso:

 public Task GetStuffAsync(String url) { return _cache.AddOrUpdate( url, GetStuffInternalAsync, (u, task) => { if (task.IsCanceled || task.IsFaulted) return GetStuffInternalAsync(u); return task; }); } 

En realidad, es razonable (y dependiendo de su diseño y rendimiento, crucial) mantener esas tareas fallidas como caché negativo . De lo contrario, si una url siempre falla, usarla una y otra vez elimina el punto de usar una memoria caché por completo.

Lo que necesitas es una forma de borrar el caché de vez en cuando. La forma más sencilla es tener un temporizador que reemplace la instancia de ConcurrentDictionarry . La solución más robusta es construir tu propio LruDictionary o algo similar.

He creado un contenedor para MemoryCache que básicamente almacena en caché objetos Lazy> y funciona para que se resuelvan todos los problemas siguientes:

  • No se iniciarán operaciones paralelas o innecesarias para obtener un valor. Varios sitios de llamada o subprocesos podrían esperar el mismo valor de la memoria caché.
  • Las tareas fallidas no se almacenan en caché. (No cacheo negativo.)
  • Los usuarios de caché no pueden obtener resultados invalidados de la caché, incluso si el valor se invalida durante una espera.

La solución se explica con más detalle en mi blog y el código de trabajo completo está disponible en GitHub .

Aquí hay una manera de almacenar en caché los resultados de las operaciones asíncronas que garantizan que no se pierda ninguna caché.

En la respuesta aceptada, si la misma url se solicita muchas veces en un bucle (según el SynchronizationContext) o desde varios subprocesos, la solicitud web continuará enviándose hasta que haya una respuesta que se almacena en caché, momento en el que la caché comenzará a recibir usado.

El siguiente método crea un objeto SemaphoreSlim para cada clave única. Esto evitará que la operación asíncrona de larga ejecución se ejecute varias veces para la misma clave, mientras que permite que se ejecute simultáneamente para diferentes claves. Obviamente, existe la sobrecarga de mantener los objetos de SemaphoreSlim para evitar las fallas de caché, por lo que puede que no valga la pena, dependiendo del caso de uso. Pero si garantizar que no se pierda el caché es importante, esto lo logra.

 private readonly ConcurrentDictionary _keyLocks = new ConcurrentDictionary(); private readonly ConcurrentDictionary _cache = new ConcurrentDictionary(); public async Task GetSomethingAsync(string key) { string value; // get the semaphore specific to this key var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1)); await keyLock.WaitAsync(); try { // try to get value from cache if (!_cache.TryGetValue(key, out value)) { // if value isn't cached, get it the long way asynchronously value = await GetSomethingTheLongWayAsync(); // cache value _cache.TryAdd(key, value); } } finally { keyLock.Release(); } return value; } 

Edición: Como @mtkachenko mencionó en los comentarios, se podría realizar una verificación de caché adicional al comienzo de este método para saltear el paso de adquisición de locking.

Este trabajo para mí:

 ObjectCache _cache = MemoryCache.Default; static object _lockObject = new object(); public Task GetAsync(string cacheKey, Func> func, TimeSpan? cacheExpiration = null) where T : class { var task = (T)_cache[cacheKey]; if (task != null) return task; lock (_lockObject) { task = (T)_cache[cacheKey](cacheKey); if (task != null) return task; task = func(); Set(cacheKey, task, cacheExpiration); task.ContinueWith(t => { if (t.Status != TaskStatus.RanToCompletion) _cache.Remove(cacheKey); }); } return task; } 

Otra forma fácil de hacer esto es extender Lazy para que sea AsyncLazy , así:

 public class AsyncLazy : Lazy> { public AsyncLazy(Func> taskFactory, LazyThreadSafetyMode mode) : base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode) { } public TaskAwaiter GetAwaiter() { return Value.GetAwaiter(); } } 

Entonces puedes hacer esto:

 private readonly ConcurrentDictionary> _cache = new ConcurrentDictionary>(); public async Task GetStuffAsync(string url) { return await _cache.GetOrAdd(url, new AsyncLazy( () => GetStuffInternalAsync(url), LazyThreadSafetyMode.ExecutionAndPublication)); }