From 3ff43fa8185a5d85242a8bc4635d654e7f671ae3 Mon Sep 17 00:00:00 2001 From: John Kerl Date: Sun, 25 Feb 2024 15:50:50 -0500 Subject: [PATCH] Miller produces no output on TSV with > 64K characters per line (#1505) * Switch to bufio.Reader, first pass * temp * Simplify ILineReader by making it stateless * Interface not necessary; ILineReader -> TLineReader * neaten * iterating --- pkg/cli/option_parse.go | 12 +- pkg/cli/separators.go | 4 + pkg/climain/mlrcli_parse.go | 10 +- pkg/input/line_reader.go | 209 +++++++++++++++++++++++--------- pkg/input/record_reader_json.go | 15 ++- pkg/input/record_reader_xtab.go | 14 ++- 6 files changed, 198 insertions(+), 66 deletions(-) diff --git a/pkg/cli/option_parse.go b/pkg/cli/option_parse.go index 3cec34dc6..f4c455366 100644 --- a/pkg/cli/option_parse.go +++ b/pkg/cli/option_parse.go @@ -8,6 +8,7 @@ package cli import ( "bufio" + "errors" "fmt" "io" "os" @@ -29,7 +30,7 @@ import ( // - IFS/IPS can have escapes like "\x1f" which aren't valid regex literals // so we unhex them. For example, from "\x1f" -- the four bytes '\', 'x', '1', 'f' // -- to the single byte with hex code 0x1f. -func FinalizeReaderOptions(readerOptions *TReaderOptions) { +func FinalizeReaderOptions(readerOptions *TReaderOptions) error { readerOptions.IFS = lib.UnhexStringLiteral(readerOptions.IFS) readerOptions.IPS = lib.UnhexStringLiteral(readerOptions.IPS) @@ -57,12 +58,17 @@ func FinalizeReaderOptions(readerOptions *TReaderOptions) { readerOptions.IFS = lib.UnbackslashStringLiteral(readerOptions.IFS) readerOptions.IPS = lib.UnbackslashStringLiteral(readerOptions.IPS) readerOptions.IRS = lib.UnbackslashStringLiteral(readerOptions.IRS) + + if readerOptions.IRS == "" { + return errors.New("empty IRS") + } + return nil } // FinalizeWriterOptions unbackslashes OPS, OFS, and ORS. This is because // because the '\n' at the command line which is Go "\\n" (a backslash and an // n) needs to become the single newline character., and likewise for "\t", etc. -func FinalizeWriterOptions(writerOptions *TWriterOptions) { +func FinalizeWriterOptions(writerOptions *TWriterOptions) error { if !writerOptions.ofsWasSpecified { writerOptions.OFS = defaultFSes[writerOptions.OutputFileFormat] } @@ -84,6 +90,8 @@ func FinalizeWriterOptions(writerOptions *TWriterOptions) { writerOptions.OFS = lib.UnbackslashStringLiteral(writerOptions.OFS) writerOptions.OPS = lib.UnbackslashStringLiteral(writerOptions.OPS) writerOptions.ORS = lib.UnbackslashStringLiteral(writerOptions.ORS) + + return nil } // ================================================================ diff --git a/pkg/cli/separators.go b/pkg/cli/separators.go index 6a52c3f2c..0a5278f64 100644 --- a/pkg/cli/separators.go +++ b/pkg/cli/separators.go @@ -82,6 +82,7 @@ var SEPARATOR_REGEX_NAMES_TO_VALUES = map[string]string{ // E.g. if IFS isn't specified, it's space for NIDX and comma for DKVP, etc. var defaultFSes = map[string]string{ + "gen": ",", "csv": ",", "csvlite": ",", "dkvp": ",", @@ -94,6 +95,7 @@ var defaultFSes = map[string]string{ } var defaultPSes = map[string]string{ + "gen": "N/A", "csv": "N/A", "csvlite": "N/A", "dkvp": "=", @@ -106,6 +108,7 @@ var defaultPSes = map[string]string{ } var defaultRSes = map[string]string{ + "gen": "\n", "csv": "\n", "csvlite": "\n", "dkvp": "\n", @@ -118,6 +121,7 @@ var defaultRSes = map[string]string{ } var defaultAllowRepeatIFSes = map[string]bool{ + "gen": false, "csv": false, "csvlite": false, "dkvp": false, diff --git a/pkg/climain/mlrcli_parse.go b/pkg/climain/mlrcli_parse.go index 586c94d78..eeecfa6dc 100644 --- a/pkg/climain/mlrcli_parse.go +++ b/pkg/climain/mlrcli_parse.go @@ -306,8 +306,14 @@ func parseCommandLinePassTwo( return nil, nil, err } - cli.FinalizeReaderOptions(&options.ReaderOptions) - cli.FinalizeWriterOptions(&options.WriterOptions) + err = cli.FinalizeReaderOptions(&options.ReaderOptions) + if err != nil { + return nil, nil, err + } + err = cli.FinalizeWriterOptions(&options.WriterOptions) + if err != nil { + return nil, nil, err + } // Set an optional global formatter for floating-point values if options.WriterOptions.FPOFMT != "" { diff --git a/pkg/input/line_reader.go b/pkg/input/line_reader.go index c6b272609..6779b65db 100644 --- a/pkg/input/line_reader.go +++ b/pkg/input/line_reader.go @@ -7,79 +7,166 @@ import ( "bufio" "container/list" "io" + "strings" + + "github.com/johnkerl/miller/pkg/lib" ) type ILineReader interface { - Scan() bool - Text() string + // Read returns the string without the final newline (or whatever terminator). + // The error condition io.EOF as non-error "error" case. + // EOF is always returned with empty line: the code here is structured so that + // we do not return a non-empty line along with an EOF indicator. + Read() (string, error) } -type TLineReader struct { - scanner *bufio.Scanner +type DefaultLineReader struct { + underlying *bufio.Reader + eof bool } -// NewLineReader handles reading lines which may be delimited by multi-line separators, -// e.g. "\xe2\x90\x9e" for USV. -func NewLineReader(handle io.Reader, irs string) *TLineReader { - scanner := bufio.NewScanner(handle) +// SingleIRSLineReader handles reading lines with a single-character terminator. +type SingleIRSLineReader struct { + underlying *bufio.Reader + end_irs byte + eof bool +} - if irs == "\n" || irs == "\r\n" { - // Handled by default scanner. - } else { - irsbytes := []byte(irs) - irslen := len(irsbytes) +// MultiIRSLineReader handles reading lines which may be delimited by multi-line separators, e.g. +// "\xe2\x90\x9e" for USV. +type MultiIRSLineReader struct { + underlying *bufio.Reader + irs string + irs_len int + end_irs byte + eof bool +} - // Custom splitter - recordSplitter := func( - data []byte, - atEOF bool, - ) ( - advance int, - token []byte, - err error, - ) { - datalen := len(data) - end := datalen - irslen - for i := 0; i <= end; i++ { - if data[i] == irsbytes[0] { - match := true - for j := 1; j < irslen; j++ { - if data[i+j] != irsbytes[j] { - match = false - break - } - } - if match { - return i + irslen, data[:i], nil - } - } - } - if !atEOF { - return 0, nil, nil - } - // There is one final token to be delivered, which may be the empty string. - // Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this - // but does not trigger an error to be returned from Scan itself. - return 0, data, bufio.ErrFinalToken +func NewLineReader(handle io.Reader, irs string) ILineReader { + underlying := bufio.NewReader(handle) + + irs_len := len(irs) + + // Not worth complicating the API by adding an error return. + // Empty IRS is checked elsewhere. + if irs_len < 1 { + panic("Empty IRS") + + } else if irs == "\n" || irs == "\r\n" { + return &DefaultLineReader{ + underlying: underlying, } - scanner.Split(recordSplitter) - } + } else if irs_len == 1 { + return &SingleIRSLineReader{ + underlying: underlying, + end_irs: irs[0], + } - return &TLineReader{ - scanner: scanner, + } else { + return &MultiIRSLineReader{ + underlying: underlying, + irs: irs, + irs_len: irs_len, + end_irs: irs[irs_len-1], + } } } -func (r *TLineReader) Scan() bool { - return r.scanner.Scan() +func (r *DefaultLineReader) Read() (string, error) { + + if r.eof { + return "", io.EOF + } + + line, err := r.underlying.ReadString('\n') + + // If we have EOF and a non-empty line, defer the EOF return to the next Read call. + if len(line) > 0 && lib.IsEOF(err) { + r.eof = true + err = nil + } + + n := len(line) + if strings.HasSuffix(line, "\r\n") { + line = line[:n-2] + } else if strings.HasSuffix(line, "\n") { + line = line[:n-1] + } + + return line, err } -func (r *TLineReader) Text() string { - return r.scanner.Text() +func (r *SingleIRSLineReader) Read() (string, error) { + + if r.eof { + return "", io.EOF + } + + line, err := r.underlying.ReadString(r.end_irs) + + // If we have EOF and a non-empty line, defer the EOF return to the next Read call. + if len(line) > 0 && lib.IsEOF(err) { + r.eof = true + err = nil + } + + n := len(line) + if n > 0 && line[n-1] == r.end_irs { + line = line[:n-1] + } + + return line, err } -// TODO: comment copiously +func (r *MultiIRSLineReader) Read() (string, error) { + + // bufio.Reader.ReadString supports only a single-character terminator. So we read lines ending + // in the final character, until we get a line that ends in the entire sequence or EOF. + // + // Note that bufio.Scanner has a very nice bufio.Scanner.Split method which can be overridden to + // support custom line-ending logic. Sadly, though, bufio.Scanner _only_ supports a fixed + // maximum line length, and misbehaves badly when presented with longer lines. So we cannot use + // bufio.Scanner. See also https://github.com/johnkerl/miller/issues/1501. + + if r.eof { + return "", io.EOF + } + + line := "" + + for { + + piece, err := r.underlying.ReadString(r.end_irs) + + // If we have EOF and a non-empty line, defer the EOF return to the next Read call. + if len(piece) > 0 && lib.IsEOF(err) { + r.eof = true + err = nil + } + + if err != nil { + return line, err // includes io.EOF as a non-error "error" case + } + + if strings.HasSuffix(piece, r.irs) { + piece = piece[:len(piece)-r.irs_len] + line += piece + break + } + + if r.eof { + line += piece + break + } + + } + + return line, nil +} + +// channelizedLineReader puts the line reading/splitting into its own goroutine in order to pipeline +// the I/O with regard to further processing. Used by record-readers for multiple file formats. // // Lines are written to the channel with their trailing newline (or whatever // IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n". @@ -94,10 +181,20 @@ func channelizedLineReader( lines := list.New() - for lineReader.Scan() { + for { + line, err := lineReader.Read() + if err != nil { + if lib.IsEOF(err) { + done = true + break + } else { + break + } + } + i++ - lines.PushBack(lineReader.Text()) + lines.PushBack(line) // See if downstream processors will be ignoring further data (e.g. mlr // head). If so, stop reading. This makes 'mlr head hugefile' exit diff --git a/pkg/input/record_reader_json.go b/pkg/input/record_reader_json.go index 1607fb0a5..ecc44e061 100644 --- a/pkg/input/record_reader_json.go +++ b/pkg/input/record_reader_json.go @@ -233,13 +233,15 @@ func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) { return bsr.populateFromLine(p), nil } + done := false + // Loop until we can get a non-comment line to pass on, or end of file. - for { + for !done { // EOF - if !bsr.lineReader.Scan() { - return 0, io.EOF + line, err := bsr.lineReader.Read() + if err != nil { + return 0, err } - line := bsr.lineReader.Text() // Non-comment line if !strings.HasPrefix(line, bsr.readerOptions.CommentString) { @@ -255,7 +257,12 @@ func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) { ell.PushBack(types.NewOutputString(line+"\n", bsr.context)) bsr.readerChannel <- ell } + + if done { + break + } } + return 0, nil } // populateFromLine is a helper for Read. It takes a full line from the diff --git a/pkg/input/record_reader_xtab.go b/pkg/input/record_reader_xtab.go index e683294cb..8dd88c308 100644 --- a/pkg/input/record_reader_xtab.go +++ b/pkg/input/record_reader_xtab.go @@ -4,6 +4,7 @@ import ( "container/list" "fmt" "io" + "os" "regexp" "strings" @@ -149,8 +150,17 @@ func channelizedStanzaScanner( stanzas := list.New() stanza := newStanza() - for lineReader.Scan() { - line := lineReader.Text() + for { + line, err := lineReader.Read() + if err != nil { + if lib.IsEOF(err) { + done = true + break + } else { + fmt.Fprintf(os.Stderr, "mlr: %#v\n", err) + break + } + } // Check for comments-in-data feature // TODO: function-pointer this away