IEnumerable para transmitir

Me gustaría hacer algo más o menos equivalente al ejemplo de código a continuación. Quiero generar y servir un flujo de datos sin tener necesariamente el conjunto completo de datos en la memoria en un momento dado.

Parece que necesitaría alguna implementación de Stream que acepte una IEnumerable (o IEnumerable ) en su constructor. Internamente, esta secuencia solo recorrerá el IEnumerable mientras se lee la secuencia o según sea necesario. Pero no conozco ninguna implementación de Stream como esta.

¿Estoy en el camino correcto? ¿Conoces alguna forma de hacer algo como esto?

  public FileStreamResult GetResult() { IEnumerable data = GetDataForStream(); Stream dataStream = ToStringStream(Encoding.UTF8, data); return File(dataStream, "text/plain", "Result"); } private IEnumerable GetDataForStream() { StringBuilder sb; for (int i = 0; i < 10000; i++) { yield return i.ToString(); yield return "\r\n"; } } private Stream ToStringStream(Encoding encoding, IEnumerable data) { // I have to write my own implementation of stream? throw new NotImplementedException(); } 

Creé una clase llamada ProducerConsumerStream que hace esto. El productor escribe datos en el flujo y el consumidor lee. Hay un búfer en el medio para que el productor pueda “escribir por adelantado” un poco. Puede definir el tamaño del búfer.

De todos modos, si no es exactamente lo que estás buscando, sospecho que te dará una buena idea de cómo se hace. Consulte Crear un nuevo tipo de flujo .

Actualizar

El enlace quedó obsoleto, así que he copiado mi código aquí. El artículo original todavía está disponible en la máquina Wayback en https://web.archive.org/web/20151210235510/http://www.informit.com/guides/content.aspx?g=dotnet&seqNum=852

Primero, la clase ProducerConsumerStream :

 using System; using System.IO; using System.Threading; using System.Diagnostics; namespace Mischel.IO { // This class is safe for 1 producer and 1 consumer. public class ProducerConsumerStream : Stream { private byte[] CircleBuff; private int Head; private int Tail; public bool IsAddingCompleted { get; private set; } public bool IsCompleted { get; private set; } // For debugging private long TotalBytesRead = 0; private long TotalBytesWritten = 0; public ProducerConsumerStream(int size) { CircleBuff = new byte[size]; Head = 1; Tail = 0; } [Conditional("JIM_DEBUG")] private void DebugOut(string msg) { Console.WriteLine(msg); } [Conditional("JIM_DEBUG")] private void DebugOut(string fmt, params object[] parms) { DebugOut(string.Format(fmt, parms)); } private int ReadBytesAvailable { get { if (Head > Tail) return Head - Tail - 1; else return CircleBuff.Length - Tail + Head - 1; } } private int WriteBytesAvailable { get { return CircleBuff.Length - ReadBytesAvailable - 1; } } private void IncrementTail() { Tail = (Tail + 1) % CircleBuff.Length; } public override int Read(byte[] buffer, int offset, int count) { if (disposed) { throw new ObjectDisposedException("The stream has been disposed."); } if (IsCompleted) { throw new EndOfStreamException("The stream is empty and has been marked complete for adding."); } if (count == 0) { return 0; } lock (CircleBuff) { DebugOut("Read: requested {0:N0} bytes. Available = {1:N0}.", count, ReadBytesAvailable); while (ReadBytesAvailable == 0) { if (IsAddingCompleted) { IsCompleted = true; return 0; } Monitor.Wait(CircleBuff); } // If Head < Tail, then there are bytes available at the end of the buffer // and also at the front of the buffer. // If reading from Tail to the end doesn't fulfill the request, // and there are still bytes available, // then read from the start of the buffer. DebugOut("Read: Head={0}, Tail={1}, Avail={2}", Head, Tail, ReadBytesAvailable); IncrementTail(); int bytesToRead; if (Tail > Head) { // When Tail > Head, we know that there are at least // (CircleBuff.Length - Tail) bytes available in the buffer. bytesToRead = CircleBuff.Length - Tail; } else { bytesToRead = Head - Tail; } // Don't read more than count bytes! bytesToRead = Math.Min(bytesToRead, count); Buffer.BlockCopy(CircleBuff, Tail, buffer, offset, bytesToRead); Tail += (bytesToRead - 1); int bytesRead = bytesToRead; // At this point, either we've exhausted the buffer, // or Tail is at the end of the buffer and has to wrap around. if (bytesRead < count && ReadBytesAvailable > 0) { // We haven't fulfilled the read. IncrementTail(); // Tail is always equal to 0 here. bytesToRead = Math.Min((count - bytesRead), (Head - Tail)); Buffer.BlockCopy(CircleBuff, Tail, buffer, offset + bytesRead, bytesToRead); bytesRead += bytesToRead; Tail += (bytesToRead - 1); } TotalBytesRead += bytesRead; DebugOut("Read: returning {0:N0} bytes. TotalRead={1:N0}", bytesRead, TotalBytesRead); DebugOut("Read: Head={0}, Tail={1}, Avail={2}", Head, Tail, ReadBytesAvailable); Monitor.Pulse(CircleBuff); return bytesRead; } } public override void Write(byte[] buffer, int offset, int count) { if (disposed) { throw new ObjectDisposedException("The stream has been disposed."); } if (IsAddingCompleted) { throw new InvalidOperationException("The stream has been marked as complete for adding."); } lock (CircleBuff) { DebugOut("Write: requested {0:N0} bytes. Available = {1:N0}", count, WriteBytesAvailable); int bytesWritten = 0; while (bytesWritten < count) { while (WriteBytesAvailable == 0) { Monitor.Wait(CircleBuff); } DebugOut("Write: Head={0}, Tail={1}, Avail={2}", Head, Tail, WriteBytesAvailable); int bytesToCopy = Math.Min((count - bytesWritten), WriteBytesAvailable); CopyBytes(buffer, offset + bytesWritten, bytesToCopy); TotalBytesWritten += bytesToCopy; DebugOut("Write: {0} bytes written. TotalWritten={1:N0}", bytesToCopy, TotalBytesWritten); DebugOut("Write: Head={0}, Tail={1}, Avail={2}", Head, Tail, WriteBytesAvailable); bytesWritten += bytesToCopy; Monitor.Pulse(CircleBuff); } } } private void CopyBytes(byte[] buffer, int srcOffset, int count) { // Insert at head // The copy might require two separate operations. // copy as much as can fit between Head and end of the circular buffer int offset = srcOffset; int bytesCopied = 0; int bytesToCopy = Math.Min(CircleBuff.Length - Head, count); if (bytesToCopy > 0) { Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy); bytesCopied = bytesToCopy; Head = (Head + bytesToCopy) % CircleBuff.Length; offset += bytesCopied; } // Copy the remainder, which will go from the beginning of the buffer. if (bytesCopied < count) { bytesToCopy = count - bytesCopied; Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy); Head = (Head + bytesToCopy) % CircleBuff.Length; } } public void CompleteAdding() { if (disposed) { throw new ObjectDisposedException("The stream has been disposed."); } lock (CircleBuff) { DebugOut("CompleteAdding: {0:N0} bytes written.", TotalBytesWritten); IsAddingCompleted = true; Monitor.Pulse(CircleBuff); } } public override bool CanRead { get { return true; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return true; } } public override void Flush() { /* does nothing */ } public override long Length { get { throw new NotImplementedException(); } } public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } private bool disposed = false; protected override void Dispose(bool disposing) { if (!disposed) { base.Dispose(disposing); disposed = true; } } } } 

Y un ejemplo de cómo usarlo:

 class Program { static readonly string TestText = "This is a test of the emergency broadcast system."; static readonly byte[] TextBytes = Encoding.UTF8.GetBytes(TestText); const int Megabyte = 1024 * 1024; const int TestBufferSize = 12; const int ProducerBufferSize = 4; const int ConsumerBufferSize = 5; static void Main(string[] args) { Console.WriteLine("TextBytes contains {0:N0} bytes.", TextBytes.Length); using (var pcStream = new ProducerConsumerStream(TestBufferSize)) { Thread ProducerThread = new Thread(ProducerThreadProc); Thread ConsumerThread = new Thread(ConsumerThreadProc); ProducerThread.Start(pcStream); Thread.Sleep(2000); ConsumerThread.Start(pcStream); ProducerThread.Join(); ConsumerThread.Join(); } Console.Write("Done. Press Enter."); Console.ReadLine(); } static void ProducerThreadProc(object state) { Console.WriteLine("Producer: started."); var pcStream = (ProducerConsumerStream)state; int offset = 0; while (offset < TestText.Length) { int bytesToWrite = Math.Min(ProducerBufferSize, TestText.Length - offset); pcStream.Write(TextBytes, offset, bytesToWrite); offset += bytesToWrite; } pcStream.CompleteAdding(); Console.WriteLine("Producer: {0:N0} total bytes written.", offset); Console.WriteLine("Producer: exit."); } static void ConsumerThreadProc(object state) { Console.WriteLine("Consumer: started."); var instream = (ProducerConsumerStream)state; int testOffset = 0; var inputBuffer = new byte[TextBytes.Length]; int bytesRead; do { int bytesToRead = Math.Min(ConsumerBufferSize, inputBuffer.Length - testOffset); bytesRead = instream.Read(inputBuffer, testOffset, bytesToRead); //Console.WriteLine("Consumer: {0:N0} bytes read.", bytesRead); testOffset += bytesRead; } while (bytesRead != 0); Console.WriteLine("Consumer: {0:N0} total bytes read.", testOffset); // Compare bytes read with TextBytes for (int i = 0; i < TextBytes.Length; ++i) { if (inputBuffer[i] != TextBytes[i]) { Console.WriteLine("Read error at position {0}", i); break; } } Console.WriteLine("Consumer: exit."); } } 

Yo tuve el mismo problema. En mi caso, un paquete de terceros solo acepta transmisiones, pero tengo un IEnumerable y no pude encontrar una respuesta en línea, así que escribí la mía, que compartiré:

 public class IEnumerableStringReader : TextReader { private readonly IEnumerator _enumerator; private bool eof = false; // is set to true when .MoveNext tells us there is no more data. private char[] curLine = null; private int curLinePos = 0; private bool disposed = false; public IEnumerableStringReader(IEnumerable input) { _enumerator = input.GetEnumerator(); } private void GetNextLine() { if (eof) return; eof = !_enumerator.MoveNext(); if (eof) return; curLine = $"{_enumerator.Current}\r\n" // IEnumerable input implies newlines exist betweent he lines. .ToCharArray(); curLinePos = 0; } public override int Peek() { if (disposed) throw new ObjectDisposedException("The stream has been disposed."); if (curLine == null || curLinePos == curLine.Length) GetNextLine(); if (eof) return -1; return curLine[curLinePos]; } public override int Read() { if (disposed) throw new ObjectDisposedException("The stream has been disposed."); if (curLine == null || curLinePos == curLine.Length) GetNextLine(); if (eof) return -1; return curLine[curLinePos++]; } public override int Read(char[] buffer, int index, int count) { if (disposed) throw new ObjectDisposedException("The stream has been disposed."); if (count == 0) return 0; int charsReturned = 0; int maxChars = Math.Min(count, buffer.Length - index); // Assuming we dont run out of input chars, we return count characters if we can. If the space left in the buffer is not big enough we return as many as will fit in the buffer. while (charsReturned < maxChars) { if (curLine == null || curLinePos == curLine.Length) GetNextLine(); if (eof) return charsReturned; int maxCurrentCopy = maxChars - charsReturned; int charsAtTheReady = curLine.Length - curLinePos; // chars available in current line int copySize = Math.Min(maxCurrentCopy, charsAtTheReady); // stop at end of buffer. // cant use Buffer.BlockCopy because it's byte based and we're dealing with chars. Array.ConstrainedCopy(curLine, curLinePos, buffer, index, copySize); index += copySize; curLinePos += copySize; charsReturned += copySize; } return charsReturned; } public override string ReadLine() { if (curLine == null || curLinePos == curLine.Length) GetNextLine(); if (eof) return null; if (curLinePos > 0) // this is necessary in case the client uses both Read() and ReadLine() calls { var tmp = new string(curLine, curLinePos, (curLine.Length - curLinePos) - 2); // create a new string from the remainder of the char array. The -2 is because GetNextLine appends a crlf. curLinePos = curLine.Length; // so next call will re-read return tmp; } // read full line. curLinePos = curLine.Length; // so next call will re-read return _enumerator.Current; // if all the client does is call ReadLine this (faster) code path will be taken. } protected override void Dispose(bool disposing) { if (!disposed) { _enumerator.Dispose(); base.Dispose(disposing); disposed = true; } } } 

En mi caso, quiero usarlo como entrada para Datastreams.Csv:

 using (var tr = new IEnumerableStringReader(input)) using (var reader = new CsvReader(tr)) { while (reader.ReadRecord()) { // do whatever } } 

Steve Sadler escribió una respuesta perfectamente funcional. Sin embargo, lo hace mucho más difícil de lo necesario.

De acuerdo con la fuente de referencia de TextReader , solo tendrá que reemplazar Peek y Leer:

Una subclase debe implementar mínimamente los métodos Peek () y Read ().

Así que primero escribo una función que convierte IEnumerable en IEnumerable donde se agrega una nueva línea al final de cada cadena:

 private static IEnumerable ReadCharacters(IEnumerable lines) { foreach (string line in lines) { foreach (char c in line + Environment.NewLine) { yield return c; } } } 

Environment.NewLine es la parte que agrega la nueva línea al final de cada cadena.

Ahora la clase es bastante sencilla:

 class EnumStringReader : TextReader { public EnumStringReader(IEnumerable lines) { this.enumerator = ReadCharacters(lines).GetEnumerator(); this.dataAvailable = this.enumerator.MoveNext(); } private bool disposed = false; private bool dataAvailable; private readonly IEnumerator enumerator; 

El constructor toma una secuencia de líneas para leer. Utiliza esta secuencia y la función escrita anterior para convertir la secuencia en una secuencia de caracteres con el Environment.NewLine agregado.

Obtiene el enumerador de la secuencia convertida y se mueve al primer carácter. Recuerda si hay un primer carácter en DataAvailable

Ahora estamos listos para echar un vistazo: si no hay datos disponibles: devuelve -1, de lo contrario, devuelve el carácter actual como int. No sigas adelante

 public override int Peek() { this.ThrowIfDisposed(); return this.dataAvailable ? this.enumerator.Current : -1; } 

Lectura: si no hay datos disponibles, devuelva -1, de lo contrario, devuelva el carácter actual como int. Avanza al siguiente carácter y recuerda si hay datos disponibles:

 public override int Read() { this.ThrowIfDisposed(); if (this.dataAvailable) { char nextChar = this.enumerator.Current; this.dataAvailable = this.enumerator.MoveNext(); return (int)nextChar; } else { return -1; } } 

No se olvide de anular la eliminación de Disose (bool) donde desecha el enumerador.

Eso es todo lo que se necesita. Todas las demás funciones utilizarán estos dos.

Ahora para llenar tu stream con las líneas:

 IEnumerable lines = ... using (TextWriter writer = System.IO.File.CreateText(...)) { using (TextReader reader = new EnumStringReader(lines); { // either write per char: while (reader.Peek() != -1) { char c = (char)reader.Read(); writer.Write(c); } // or write per line: string line = reader.ReadLine(); // line is without newLine! while (line != null) { writer.WriteLine(line); line = reader.ReadLine(); } // or write per block buffer buf = new char[4096]; int nrRead = reader.ReadBlock(buf, 0, buf.Length) while (nrRead > 0) { writer.Write(buf, 0, nrRead); nrRead = reader.ReadBlock(buf, 0, buf.Length); } } }