mirror of
https://github.com/johnkerl/miller.git
synced 2026-01-23 10:15:36 +00:00
222 lines
4.8 KiB
Go
222 lines
4.8 KiB
Go
// This file contains the interface for file-format-specific record-readers, as
|
|
// well as a collection of utility functions.
|
|
|
|
package input
|
|
|
|
import (
|
|
"bufio"
|
|
"io"
|
|
"strings"
|
|
|
|
"github.com/johnkerl/miller/v6/pkg/lib"
|
|
)
|
|
|
|
type ILineReader interface {
|
|
// Read returns the string without the final newline (or whatever terminator).
|
|
// The error condition io.EOF as non-error "error" case.
|
|
// EOF is always returned with empty line: the code here is structured so that
|
|
// we do not return a non-empty line along with an EOF indicator.
|
|
Read() (string, error)
|
|
}
|
|
|
|
type DefaultLineReader struct {
|
|
underlying *bufio.Reader
|
|
eof bool
|
|
}
|
|
|
|
// SingleIRSLineReader handles reading lines with a single-character terminator.
|
|
type SingleIRSLineReader struct {
|
|
underlying *bufio.Reader
|
|
end_irs byte
|
|
eof bool
|
|
}
|
|
|
|
// MultiIRSLineReader handles reading lines which may be delimited by multi-line separators, e.g.
|
|
// "\xe2\x90\x9e" for USV.
|
|
type MultiIRSLineReader struct {
|
|
underlying *bufio.Reader
|
|
irs string
|
|
irs_len int
|
|
end_irs byte
|
|
eof bool
|
|
}
|
|
|
|
func NewLineReader(handle io.Reader, irs string) ILineReader {
|
|
underlying := bufio.NewReader(handle)
|
|
|
|
irs_len := len(irs)
|
|
|
|
// Not worth complicating the API by adding an error return.
|
|
// Empty IRS is checked elsewhere.
|
|
if irs_len < 1 {
|
|
panic("Empty IRS")
|
|
|
|
} else if irs == "\n" || irs == "\r\n" {
|
|
return &DefaultLineReader{
|
|
underlying: underlying,
|
|
}
|
|
|
|
} else if irs_len == 1 {
|
|
return &SingleIRSLineReader{
|
|
underlying: underlying,
|
|
end_irs: irs[0],
|
|
}
|
|
|
|
} else {
|
|
return &MultiIRSLineReader{
|
|
underlying: underlying,
|
|
irs: irs,
|
|
irs_len: irs_len,
|
|
end_irs: irs[irs_len-1],
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *DefaultLineReader) Read() (string, error) {
|
|
|
|
if r.eof {
|
|
return "", io.EOF
|
|
}
|
|
|
|
line, err := r.underlying.ReadString('\n')
|
|
|
|
// If we have EOF and a non-empty line, defer the EOF return to the next Read call.
|
|
if len(line) > 0 && lib.IsEOF(err) {
|
|
r.eof = true
|
|
err = nil
|
|
}
|
|
|
|
n := len(line)
|
|
if strings.HasSuffix(line, "\r\n") {
|
|
line = line[:n-2]
|
|
} else if strings.HasSuffix(line, "\n") {
|
|
line = line[:n-1]
|
|
}
|
|
|
|
return line, err
|
|
}
|
|
|
|
func (r *SingleIRSLineReader) Read() (string, error) {
|
|
|
|
if r.eof {
|
|
return "", io.EOF
|
|
}
|
|
|
|
line, err := r.underlying.ReadString(r.end_irs)
|
|
|
|
// If we have EOF and a non-empty line, defer the EOF return to the next Read call.
|
|
if len(line) > 0 && lib.IsEOF(err) {
|
|
r.eof = true
|
|
err = nil
|
|
}
|
|
|
|
n := len(line)
|
|
if n > 0 && line[n-1] == r.end_irs {
|
|
line = line[:n-1]
|
|
}
|
|
|
|
return line, err
|
|
}
|
|
|
|
func (r *MultiIRSLineReader) Read() (string, error) {
|
|
|
|
// bufio.Reader.ReadString supports only a single-character terminator. So we read lines ending
|
|
// in the final character, until we get a line that ends in the entire sequence or EOF.
|
|
//
|
|
// Note that bufio.Scanner has a very nice bufio.Scanner.Split method which can be overridden to
|
|
// support custom line-ending logic. Sadly, though, bufio.Scanner _only_ supports a fixed
|
|
// maximum line length, and misbehaves badly when presented with longer lines. So we cannot use
|
|
// bufio.Scanner. See also https://github.com/johnkerl/miller/issues/1501.
|
|
|
|
if r.eof {
|
|
return "", io.EOF
|
|
}
|
|
|
|
line := ""
|
|
|
|
for {
|
|
|
|
piece, err := r.underlying.ReadString(r.end_irs)
|
|
|
|
// If we have EOF and a non-empty line, defer the EOF return to the next Read call.
|
|
if len(piece) > 0 && lib.IsEOF(err) {
|
|
r.eof = true
|
|
err = nil
|
|
}
|
|
|
|
if err != nil {
|
|
return line, err // includes io.EOF as a non-error "error" case
|
|
}
|
|
|
|
if strings.HasSuffix(piece, r.irs) {
|
|
piece = piece[:len(piece)-r.irs_len]
|
|
line += piece
|
|
break
|
|
}
|
|
|
|
if r.eof {
|
|
line += piece
|
|
break
|
|
}
|
|
|
|
}
|
|
|
|
return line, nil
|
|
}
|
|
|
|
// channelizedLineReader puts the line reading/splitting into its own goroutine in order to pipeline
|
|
// the I/O with regard to further processing. Used by record-readers for multiple file formats.
|
|
//
|
|
// Lines are written to the channel with their trailing newline (or whatever
|
|
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
|
|
func channelizedLineReader(
|
|
lineReader ILineReader,
|
|
linesChannel chan<- []string,
|
|
downstreamDoneChannel <-chan bool, // for mlr head
|
|
recordsPerBatch int64,
|
|
) {
|
|
i := int64(0)
|
|
done := false
|
|
|
|
lines := make([]string, recordsPerBatch)
|
|
|
|
for {
|
|
line, err := lineReader.Read()
|
|
if err != nil {
|
|
if lib.IsEOF(err) {
|
|
done = true
|
|
break
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
i++
|
|
|
|
lines = append(lines, line)
|
|
|
|
// See if downstream processors will be ignoring further data (e.g. mlr
|
|
// head). If so, stop reading. This makes 'mlr head hugefile' exit
|
|
// quickly, as it should.
|
|
if i%recordsPerBatch == 0 {
|
|
select {
|
|
case <-downstreamDoneChannel:
|
|
done = true
|
|
break
|
|
default:
|
|
break
|
|
}
|
|
if done {
|
|
break
|
|
}
|
|
linesChannel <- lines
|
|
lines = make([]string, recordsPerBatch)
|
|
}
|
|
|
|
if done {
|
|
break
|
|
}
|
|
}
|
|
linesChannel <- lines
|
|
close(linesChannel) // end-of-stream marker
|
|
}
|