diff --git a/pkg/input/line_reader.go b/pkg/input/line_reader.go index b1f965307..00d761b33 100644 --- a/pkg/input/line_reader.go +++ b/pkg/input/line_reader.go @@ -5,7 +5,6 @@ package input import ( "bufio" - "container/list" "io" "strings" @@ -172,14 +171,14 @@ func (r *MultiIRSLineReader) Read() (string, error) { // IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n". func channelizedLineReader( lineReader ILineReader, - linesChannel chan<- *list.List, + linesChannel chan<- []string, downstreamDoneChannel <-chan bool, // for mlr head recordsPerBatch int64, ) { i := int64(0) done := false - lines := list.New() + lines := make([]string, recordsPerBatch) for { line, err := lineReader.Read() @@ -194,7 +193,7 @@ func channelizedLineReader( i++ - lines.PushBack(line) + lines = append(lines, line) // See if downstream processors will be ignoring further data (e.g. mlr // head). If so, stop reading. This makes 'mlr head hugefile' exit @@ -211,7 +210,7 @@ func channelizedLineReader( break } linesChannel <- lines - lines = list.New() + lines = make([]string, recordsPerBatch) } if done { diff --git a/pkg/input/pseudo_reader_gen.go b/pkg/input/pseudo_reader_gen.go index e847b59ab..3f0e3936a 100644 --- a/pkg/input/pseudo_reader_gen.go +++ b/pkg/input/pseudo_reader_gen.go @@ -1,7 +1,6 @@ package input import ( - "container/list" "fmt" "github.com/johnkerl/miller/v6/pkg/bifs" @@ -28,7 +27,7 @@ func NewPseudoReaderGen( func (reader *PseudoReaderGen) Read( filenames []string, // ignored context types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -38,7 +37,7 @@ func (reader *PseudoReaderGen) Read( func (reader *PseudoReaderGen) process( context *types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -71,7 +70,7 @@ func (reader *PseudoReaderGen) process( key := reader.readerOptions.GeneratorOptions.FieldName value := start.Copy() - recordsAndContexts := list.New() + recordsAndContexts := make([]*types.RecordAndContext, recordsPerBatch) eof := false for !eof { @@ -84,11 +83,11 @@ func (reader *PseudoReaderGen) process( record.PutCopy(key, value) context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) - if int64(recordsAndContexts.Len()) >= recordsPerBatch { + if int64(len(recordsAndContexts)) >= recordsPerBatch { readerChannel <- recordsAndContexts - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, 0) // See if downstream processors will be ignoring further data (e.g. // mlr head). If so, stop reading. This makes 'mlr head hugefile' @@ -111,7 +110,7 @@ func (reader *PseudoReaderGen) process( value = bifs.BIF_plus_binary(value, step) } - if recordsAndContexts.Len() > 0 { + if len(recordsAndContexts) > 0 { readerChannel <- recordsAndContexts } } diff --git a/pkg/input/record_reader.go b/pkg/input/record_reader.go index 3ad932f2f..37f9e500b 100644 --- a/pkg/input/record_reader.go +++ b/pkg/input/record_reader.go @@ -4,8 +4,6 @@ package input import ( - "container/list" - "github.com/johnkerl/miller/v6/pkg/types" ) @@ -19,7 +17,7 @@ type IRecordReader interface { Read( filenames []string, initialContext types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) diff --git a/pkg/input/record_reader_csv.go b/pkg/input/record_reader_csv.go index 6ed07250d..88cc6f644 100644 --- a/pkg/input/record_reader_csv.go +++ b/pkg/input/record_reader_csv.go @@ -52,7 +52,7 @@ func NewRecordReaderCSV( func (reader *RecordReaderCSV) Read( filenames []string, context types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -92,7 +92,7 @@ func (reader *RecordReaderCSV) processHandle( handle io.Reader, filename string, context *types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -115,7 +115,7 @@ func (reader *RecordReaderCSV) processHandle( for { recordsAndContexts, eof := reader.getRecordBatch(csvRecordsChannel, errorChannel, context) - if recordsAndContexts.Len() > 0 { + if len(recordsAndContexts) > 0 { readerChannel <- recordsAndContexts } if eof { @@ -185,10 +185,10 @@ func (reader *RecordReaderCSV) getRecordBatch( errorChannel chan error, context *types.Context, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) { - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) dedupeFieldNames := reader.readerOptions.DedupeFieldNames csvRecords, more := <-csvRecordsChannel @@ -279,7 +279,7 @@ func (reader *RecordReaderCSV) getRecordBatch( context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) } return recordsAndContexts, false @@ -290,7 +290,7 @@ func (reader *RecordReaderCSV) getRecordBatch( func (reader *RecordReaderCSV) maybeConsumeComment( csvRecord []string, context *types.Context, - recordsAndContexts *list.List, // list of *types.RecordAndContext + recordsAndContexts []*types.RecordAndContext, ) bool { if reader.readerOptions.CommentHandling == cli.CommentsAreData { // Nothing is to be construed as a comment @@ -323,7 +323,7 @@ func (reader *RecordReaderCSV) maybeConsumeComment( csvWriter.Comma = rune(reader.ifs0) csvWriter.Write(csvRecord) csvWriter.Flush() - recordsAndContexts.PushBack(types.NewOutputString(buffer.String(), context)) + recordsAndContexts = append(recordsAndContexts, types.NewOutputString(buffer.String(), context)) } else /* reader.readerOptions.CommentHandling == cli.SkipComments */ { // discard entirely } diff --git a/pkg/input/record_reader_csvlite.go b/pkg/input/record_reader_csvlite.go index d658a4f99..8c9ecb8e7 100644 --- a/pkg/input/record_reader_csvlite.go +++ b/pkg/input/record_reader_csvlite.go @@ -19,7 +19,6 @@ package input // 3,4,5,6 3,4,5 import ( - "container/list" "fmt" "io" "strconv" @@ -35,12 +34,12 @@ import ( // implicit-CSV-header record-batch getter. type recordBatchGetterCSV func( reader *RecordReaderCSVLite, - linesChannel <-chan *list.List, + linesChannel <-chan []string, filename string, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) @@ -81,7 +80,7 @@ func NewRecordReaderCSVLite( func (reader *RecordReaderCSVLite) Read( filenames []string, context types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -135,7 +134,7 @@ func (reader *RecordReaderCSVLite) processHandle( handle io.Reader, filename string, context *types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -145,12 +144,12 @@ func (reader *RecordReaderCSVLite) processHandle( recordsPerBatch := reader.recordsPerBatch lineReader := NewLineReader(handle, reader.readerOptions.IRS) - linesChannel := make(chan *list.List, recordsPerBatch) + linesChannel := make(chan []string, recordsPerBatch) go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel) - if recordsAndContexts.Len() > 0 { + if len(recordsAndContexts) > 0 { readerChannel <- recordsAndContexts } if eof { @@ -161,15 +160,15 @@ func (reader *RecordReaderCSVLite) processHandle( func getRecordBatchExplicitCSVHeader( reader *RecordReaderCSVLite, - linesChannel <-chan *list.List, + linesChannel <-chan []string, filename string, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) { - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) dedupeFieldNames := reader.readerOptions.DedupeFieldNames lines, more := <-linesChannel @@ -177,9 +176,7 @@ func getRecordBatchExplicitCSVHeader( return recordsAndContexts, true } - for e := lines.Front(); e != nil; e = e.Next() { - line := e.Value.(string) - + for _, line := range lines { reader.inputLineNumber++ // Strip CSV BOM @@ -194,7 +191,7 @@ func getRecordBatchExplicitCSVHeader( if reader.readerOptions.CommentHandling != cli.CommentsAreData { if strings.HasPrefix(line, reader.readerOptions.CommentString) { if reader.readerOptions.CommentHandling == cli.PassComments { - recordsAndContexts.PushBack(types.NewOutputString(line+"\n", context)) + recordsAndContexts = append(recordsAndContexts, types.NewOutputString(line+"\n", context)) continue } else if reader.readerOptions.CommentHandling == cli.SkipComments { continue @@ -275,7 +272,7 @@ func getRecordBatchExplicitCSVHeader( } context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) } } @@ -284,15 +281,15 @@ func getRecordBatchExplicitCSVHeader( func getRecordBatchImplicitCSVHeader( reader *RecordReaderCSVLite, - linesChannel <-chan *list.List, + linesChannel <-chan []string, filename string, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) { - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) dedupeFieldNames := reader.readerOptions.DedupeFieldNames lines, more := <-linesChannel @@ -300,9 +297,7 @@ func getRecordBatchImplicitCSVHeader( return recordsAndContexts, true } - for e := lines.Front(); e != nil; e = e.Next() { - line := e.Value.(string) - + for _, line := range lines { reader.inputLineNumber++ // Check for comments-in-data feature @@ -310,7 +305,7 @@ func getRecordBatchImplicitCSVHeader( if reader.readerOptions.CommentHandling != cli.CommentsAreData { if strings.HasPrefix(line, reader.readerOptions.CommentString) { if reader.readerOptions.CommentHandling == cli.PassComments { - recordsAndContexts.PushBack(types.NewOutputString(line+"\n", context)) + recordsAndContexts = append(recordsAndContexts, types.NewOutputString(line+"\n", context)) continue } else if reader.readerOptions.CommentHandling == cli.SkipComments { continue @@ -402,7 +397,7 @@ func getRecordBatchImplicitCSVHeader( } context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) } return recordsAndContexts, false diff --git a/pkg/input/record_reader_dkvp_nidx.go b/pkg/input/record_reader_dkvp_nidx.go index efc0ae385..135a61660 100644 --- a/pkg/input/record_reader_dkvp_nidx.go +++ b/pkg/input/record_reader_dkvp_nidx.go @@ -3,7 +3,6 @@ package input import ( - "container/list" "io" "strconv" "strings" @@ -55,7 +54,7 @@ func NewRecordReaderNIDX( func (reader *RecordReaderDKVPNIDX) Read( filenames []string, context types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -95,7 +94,7 @@ func (reader *RecordReaderDKVPNIDX) processHandle( handle io.Reader, filename string, context *types.Context, - readerChannel chan<- *list.List, + readerChannel chan<- []*types.RecordAndContext, errorChannel chan<- error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -103,12 +102,12 @@ func (reader *RecordReaderDKVPNIDX) processHandle( recordsPerBatch := reader.recordsPerBatch lineReader := NewLineReader(handle, reader.readerOptions.IRS) - linesChannel := make(chan *list.List, recordsPerBatch) + linesChannel := make(chan []string, recordsPerBatch) go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.getRecordBatch(linesChannel, errorChannel, context) - if recordsAndContexts.Len() > 0 { + if len(recordsAndContexts) > 0 { readerChannel <- recordsAndContexts } if eof { @@ -119,29 +118,27 @@ func (reader *RecordReaderDKVPNIDX) processHandle( // TODO: comment copiously we're trying to handle slow/fast/short/long reads: tail -f, smallfile, bigfile. func (reader *RecordReaderDKVPNIDX) getRecordBatch( - linesChannel <-chan *list.List, + linesChannel <-chan []string, errorChannel chan<- error, context *types.Context, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) { - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) lines, more := <-linesChannel if !more { return recordsAndContexts, true } - for e := lines.Front(); e != nil; e = e.Next() { - line := e.Value.(string) - + for _, line := range lines { // Check for comments-in-data feature // TODO: function-pointer this away if reader.readerOptions.CommentHandling != cli.CommentsAreData { if strings.HasPrefix(line, reader.readerOptions.CommentString) { if reader.readerOptions.CommentHandling == cli.PassComments { - recordsAndContexts.PushBack(types.NewOutputString(line+"\n", context)) + recordsAndContexts = append(recordsAndContexts, types.NewOutputString(line+"\n", context)) continue } else if reader.readerOptions.CommentHandling == cli.SkipComments { continue @@ -157,7 +154,7 @@ func (reader *RecordReaderDKVPNIDX) getRecordBatch( } context.UpdateForInputRecord() recordAndContext := types.NewRecordAndContext(record, context) - recordsAndContexts.PushBack(recordAndContext) + recordsAndContexts = append(recordsAndContexts, recordAndContext) } return recordsAndContexts, false diff --git a/pkg/input/record_reader_json.go b/pkg/input/record_reader_json.go index 63d9f7368..c45b7d2ef 100644 --- a/pkg/input/record_reader_json.go +++ b/pkg/input/record_reader_json.go @@ -1,7 +1,6 @@ package input import ( - "container/list" "fmt" "io" "strings" @@ -34,7 +33,7 @@ func NewRecordReaderJSON( func (reader *RecordReaderJSON) Read( filenames []string, context types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -75,7 +74,7 @@ func (reader *RecordReaderJSON) processHandle( handle io.Reader, filename string, context *types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -87,7 +86,7 @@ func (reader *RecordReaderJSON) processHandle( handle = NewJSONCommentEnabledReader(handle, reader.readerOptions, readerChannel) } decoder := json.NewDecoder(handle) - recordsAndContexts := list.New() + recordsAndContexts := make([]*types.RecordAndContext, reader.recordsPerBatch) eof := false i := int64(0) @@ -132,11 +131,11 @@ func (reader *RecordReaderJSON) processHandle( return } context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) - if int64(recordsAndContexts.Len()) >= recordsPerBatch { + if int64(len(recordsAndContexts)) >= recordsPerBatch { readerChannel <- recordsAndContexts - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) } } else if mlrval.IsArray() { @@ -164,11 +163,11 @@ func (reader *RecordReaderJSON) processHandle( return } context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) - if int64(recordsAndContexts.Len()) >= recordsPerBatch { + if int64(len(recordsAndContexts)) >= recordsPerBatch { readerChannel <- recordsAndContexts - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) } } @@ -181,7 +180,7 @@ func (reader *RecordReaderJSON) processHandle( } } - if recordsAndContexts.Len() > 0 { + if len(recordsAndContexts) > 0 { readerChannel <- recordsAndContexts } } @@ -211,8 +210,8 @@ func (reader *RecordReaderJSON) processHandle( type JSONCommentEnabledReader struct { lineReader ILineReader readerOptions *cli.TReaderOptions - context *types.Context // Needed for channelized stdout-printing logic - readerChannel chan<- *list.List // list of *types.RecordAndContext + context *types.Context // Needed for channelized stdout-printing logic + readerChannel chan<- []*types.RecordAndContext // In case a line was ingested which was longer than the read-buffer passed // to us, in which case we need to split up that line and return it over @@ -223,7 +222,7 @@ type JSONCommentEnabledReader struct { func NewJSONCommentEnabledReader( underlying io.Reader, readerOptions *cli.TReaderOptions, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, ) *JSONCommentEnabledReader { return &JSONCommentEnabledReader{ lineReader: NewLineReader(underlying, "\n"), @@ -260,8 +259,7 @@ func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) { if bsr.readerOptions.CommentHandling == cli.PassComments { // Insert the string into the record-output stream, so that goroutine can // print it, resulting in deterministic output-ordering. - ell := list.New() - ell.PushBack(types.NewOutputString(line+"\n", bsr.context)) + ell := []*types.RecordAndContext{types.NewOutputString(line+"\n", bsr.context)} bsr.readerChannel <- ell } diff --git a/pkg/input/record_reader_pprint.go b/pkg/input/record_reader_pprint.go index aad87769c..fa1452ca8 100644 --- a/pkg/input/record_reader_pprint.go +++ b/pkg/input/record_reader_pprint.go @@ -1,7 +1,6 @@ package input import ( - "container/list" "fmt" "io" "regexp" @@ -73,19 +72,19 @@ type RecordReaderPprintBarredOrMarkdown struct { // implicit-PPRINT-header record-batch getter. type recordBatchGetterPprint func( reader *RecordReaderPprintBarredOrMarkdown, - linesChannel <-chan *list.List, + linesChannel <-chan []string, filename string, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) func (reader *RecordReaderPprintBarredOrMarkdown) Read( filenames []string, context types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -139,7 +138,7 @@ func (reader *RecordReaderPprintBarredOrMarkdown) processHandle( handle io.Reader, filename string, context *types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -149,12 +148,12 @@ func (reader *RecordReaderPprintBarredOrMarkdown) processHandle( recordsPerBatch := reader.recordsPerBatch lineReader := NewLineReader(handle, reader.readerOptions.IRS) - linesChannel := make(chan *list.List, recordsPerBatch) + linesChannel := make(chan []string, recordsPerBatch) go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel) - if recordsAndContexts.Len() > 0 { + if len(recordsAndContexts) > 0 { readerChannel <- recordsAndContexts } if eof { @@ -165,15 +164,15 @@ func (reader *RecordReaderPprintBarredOrMarkdown) processHandle( func getRecordBatchExplicitPprintHeader( reader *RecordReaderPprintBarredOrMarkdown, - linesChannel <-chan *list.List, + linesChannel <-chan []string, filename string, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) { - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) dedupeFieldNames := reader.readerOptions.DedupeFieldNames lines, more := <-linesChannel @@ -181,9 +180,7 @@ func getRecordBatchExplicitPprintHeader( return recordsAndContexts, true } - for e := lines.Front(); e != nil; e = e.Next() { - line := e.Value.(string) - + for _, line := range lines { reader.inputLineNumber++ // Check for comments-in-data feature @@ -191,7 +188,7 @@ func getRecordBatchExplicitPprintHeader( if reader.readerOptions.CommentHandling != cli.CommentsAreData { if strings.HasPrefix(line, reader.readerOptions.CommentString) { if reader.readerOptions.CommentHandling == cli.PassComments { - recordsAndContexts.PushBack(types.NewOutputString(line+"\n", context)) + recordsAndContexts = append(recordsAndContexts, types.NewOutputString(line+"\n", context)) continue } else if reader.readerOptions.CommentHandling == cli.SkipComments { continue @@ -292,7 +289,7 @@ func getRecordBatchExplicitPprintHeader( } context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) } } @@ -302,15 +299,15 @@ func getRecordBatchExplicitPprintHeader( func getRecordBatchImplicitPprintHeader( reader *RecordReaderPprintBarredOrMarkdown, - linesChannel <-chan *list.List, + linesChannel <-chan []string, filename string, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) { - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) dedupeFieldNames := reader.readerOptions.DedupeFieldNames lines, more := <-linesChannel @@ -318,9 +315,7 @@ func getRecordBatchImplicitPprintHeader( return recordsAndContexts, true } - for e := lines.Front(); e != nil; e = e.Next() { - line := e.Value.(string) - + for _, line := range lines { reader.inputLineNumber++ // Check for comments-in-data feature @@ -328,7 +323,7 @@ func getRecordBatchImplicitPprintHeader( if reader.readerOptions.CommentHandling != cli.CommentsAreData { if strings.HasPrefix(line, reader.readerOptions.CommentString) { if reader.readerOptions.CommentHandling == cli.PassComments { - recordsAndContexts.PushBack(types.NewOutputString(line+"\n", context)) + recordsAndContexts = append(recordsAndContexts, types.NewOutputString(line+"\n", context)) continue } else if reader.readerOptions.CommentHandling == cli.SkipComments { continue @@ -436,7 +431,7 @@ func getRecordBatchImplicitPprintHeader( } context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) } return recordsAndContexts, false diff --git a/pkg/input/record_reader_tsv.go b/pkg/input/record_reader_tsv.go index f70042bbe..1b86f5c81 100644 --- a/pkg/input/record_reader_tsv.go +++ b/pkg/input/record_reader_tsv.go @@ -1,7 +1,6 @@ package input import ( - "container/list" "fmt" "io" "strconv" @@ -17,12 +16,12 @@ import ( // implicit-TSV-header record-batch getter. type recordBatchGetterTSV func( reader *RecordReaderTSV, - linesChannel <-chan *list.List, + linesChannel <-chan []string, filename string, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) @@ -63,7 +62,7 @@ func NewRecordReaderTSV( func (reader *RecordReaderTSV) Read( filenames []string, context types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -117,7 +116,7 @@ func (reader *RecordReaderTSV) processHandle( handle io.Reader, filename string, context *types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -127,12 +126,12 @@ func (reader *RecordReaderTSV) processHandle( recordsPerBatch := reader.recordsPerBatch lineReader := NewLineReader(handle, reader.readerOptions.IRS) - linesChannel := make(chan *list.List, recordsPerBatch) + linesChannel := make(chan []string, recordsPerBatch) go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel) - if recordsAndContexts.Len() > 0 { + if len(recordsAndContexts) > 0 { readerChannel <- recordsAndContexts } if eof { @@ -143,15 +142,15 @@ func (reader *RecordReaderTSV) processHandle( func getRecordBatchExplicitTSVHeader( reader *RecordReaderTSV, - linesChannel <-chan *list.List, + linesChannel <-chan []string, filename string, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) { - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) dedupeFieldNames := reader.readerOptions.DedupeFieldNames lines, more := <-linesChannel @@ -159,9 +158,7 @@ func getRecordBatchExplicitTSVHeader( return recordsAndContexts, true } - for e := lines.Front(); e != nil; e = e.Next() { - line := e.Value.(string) - + for _, line := range lines { reader.inputLineNumber++ // Check for comments-in-data feature @@ -169,7 +166,7 @@ func getRecordBatchExplicitTSVHeader( if reader.readerOptions.CommentHandling != cli.CommentsAreData { if strings.HasPrefix(line, reader.readerOptions.CommentString) { if reader.readerOptions.CommentHandling == cli.PassComments { - recordsAndContexts.PushBack(types.NewOutputString(line+"\n", context)) + recordsAndContexts = append(recordsAndContexts, types.NewOutputString(line+"\n", context)) continue } else if reader.readerOptions.CommentHandling == cli.SkipComments { continue @@ -240,7 +237,7 @@ func getRecordBatchExplicitTSVHeader( } context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) } } @@ -249,15 +246,15 @@ func getRecordBatchExplicitTSVHeader( func getRecordBatchImplicitTSVHeader( reader *RecordReaderTSV, - linesChannel <-chan *list.List, + linesChannel <-chan []string, filename string, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) { - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) dedupeFieldNames := reader.readerOptions.DedupeFieldNames lines, more := <-linesChannel @@ -265,9 +262,7 @@ func getRecordBatchImplicitTSVHeader( return recordsAndContexts, true } - for e := lines.Front(); e != nil; e = e.Next() { - line := e.Value.(string) - + for _, line := range lines { reader.inputLineNumber++ // Check for comments-in-data feature @@ -275,7 +270,7 @@ func getRecordBatchImplicitTSVHeader( if reader.readerOptions.CommentHandling != cli.CommentsAreData { if strings.HasPrefix(line, reader.readerOptions.CommentString) { if reader.readerOptions.CommentHandling == cli.PassComments { - recordsAndContexts.PushBack(types.NewOutputString(line+"\n", context)) + recordsAndContexts = append(recordsAndContexts, types.NewOutputString(line+"\n", context)) continue } else if reader.readerOptions.CommentHandling == cli.SkipComments { continue @@ -363,7 +358,7 @@ func getRecordBatchImplicitTSVHeader( } context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append( recordsAndContexts, types.NewRecordAndContext(record, context)) } return recordsAndContexts, false diff --git a/pkg/input/record_reader_xtab.go b/pkg/input/record_reader_xtab.go index 5d1530007..1c90008cd 100644 --- a/pkg/input/record_reader_xtab.go +++ b/pkg/input/record_reader_xtab.go @@ -1,7 +1,6 @@ package input import ( - "container/list" "fmt" "io" "os" @@ -33,14 +32,14 @@ type RecordReaderXTAB struct { // 500 or so). This struct helps us keep each stanza's comment lines along with // the stanza they originated in. type tStanza struct { - dataLines *list.List - commentLines *list.List + dataLines []string + commentLines []string } -func newStanza() *tStanza { +func newStanza(recordsPerBatch int64) *tStanza { return &tStanza{ - dataLines: list.New(), - commentLines: list.New(), + dataLines: make([]string, recordsPerBatch), + commentLines: make([]string, recordsPerBatch), } } @@ -58,7 +57,7 @@ func NewRecordReaderXTAB( func (reader *RecordReaderXTAB) Read( filenames []string, context types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -98,7 +97,7 @@ func (reader *RecordReaderXTAB) processHandle( handle io.Reader, filename string, context *types.Context, - readerChannel chan<- *list.List, // list of *types.RecordAndContext + readerChannel chan<- []*types.RecordAndContext, errorChannel chan error, downstreamDoneChannel <-chan bool, // for mlr head ) { @@ -108,13 +107,13 @@ func (reader *RecordReaderXTAB) processHandle( // XTAB uses repeated IFS, rather than IRS, to delimit records lineReader := NewLineReader(handle, reader.readerOptions.IFS) - stanzasChannel := make(chan *list.List, recordsPerBatch) + stanzasChannel := make(chan []*tStanza, recordsPerBatch) go channelizedStanzaScanner(lineReader, reader.readerOptions, stanzasChannel, downstreamDoneChannel, recordsPerBatch) for { recordsAndContexts, eof := reader.getRecordBatch(stanzasChannel, context, errorChannel) - if recordsAndContexts.Len() > 0 { + if len(recordsAndContexts) > 0 { readerChannel <- recordsAndContexts } if eof { @@ -140,7 +139,7 @@ func (reader *RecordReaderXTAB) processHandle( func channelizedStanzaScanner( lineReader ILineReader, readerOptions *cli.TReaderOptions, - stanzasChannel chan<- *list.List, // list of list of string + stanzasChannel chan<- []*tStanza, downstreamDoneChannel <-chan bool, // for mlr head recordsPerBatch int64, ) { @@ -148,8 +147,8 @@ func channelizedStanzaScanner( inStanza := false done := false - stanzas := list.New() - stanza := newStanza() + stanzas := make([]*tStanza, recordsPerBatch) + stanza := newStanza(recordsPerBatch) for { line, err := lineReader.Read() @@ -168,7 +167,7 @@ func channelizedStanzaScanner( if readerOptions.CommentHandling != cli.CommentsAreData { if strings.HasPrefix(line, readerOptions.CommentString) { if readerOptions.CommentHandling == cli.PassComments { - stanza.commentLines.PushBack(line) + stanza.commentLines = append(stanza.commentLines, line) continue } else if readerOptions.CommentHandling == cli.SkipComments { continue @@ -184,9 +183,9 @@ func channelizedStanzaScanner( // 3. At end of file, multiple empty lines are ignored. if inStanza { inStanza = false - stanzas.PushBack(stanza) + stanzas = append(stanzas, stanza) numStanzasSeen++ - stanza = newStanza() + stanza = newStanza(recordsPerBatch) } else { continue } @@ -194,7 +193,7 @@ func channelizedStanzaScanner( if !inStanza { inStanza = true } - stanza.dataLines.PushBack(line) + stanza.dataLines = append(stanza.dataLines, line) } // See if downstream processors will be ignoring further data (e.g. mlr @@ -212,7 +211,7 @@ func channelizedStanzaScanner( break } stanzasChannel <- stanzas - stanzas = list.New() + stanzas = make([]*tStanza, recordsPerBatch) } if done { @@ -222,8 +221,8 @@ func channelizedStanzaScanner( // The last stanza may not have a trailing newline after it. Any lines in the stanza // at this point will form the final record in the stream. - if stanza.dataLines.Len() > 0 || stanza.commentLines.Len() > 0 { - stanzas.PushBack(stanza) + if len(stanza.dataLines) > 0 || len(stanza.commentLines) > 0 { + stanzas = append(stanzas, stanza) } stanzasChannel <- stanzas @@ -232,38 +231,35 @@ func channelizedStanzaScanner( // TODO: comment copiously we're trying to handle slow/fast/short/long reads: tail -f, smallfile, bigfile. func (reader *RecordReaderXTAB) getRecordBatch( - stanzasChannel <-chan *list.List, + stanzasChannel <-chan []*tStanza, context *types.Context, errorChannel chan error, ) ( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, eof bool, ) { - recordsAndContexts = list.New() + recordsAndContexts = make([]*types.RecordAndContext, reader.recordsPerBatch) stanzas, more := <-stanzasChannel if !more { return recordsAndContexts, true } - for e := stanzas.Front(); e != nil; e = e.Next() { - stanza := e.Value.(*tStanza) - - if stanza.commentLines.Len() > 0 { - for f := stanza.commentLines.Front(); f != nil; f = f.Next() { - line := f.Value.(string) - recordsAndContexts.PushBack(types.NewOutputString(line+reader.readerOptions.IFS, context)) + for _, stanza := range stanzas { + if len(stanza.commentLines) > 0 { + for _, line := range stanza.commentLines { + recordsAndContexts = append(recordsAndContexts, types.NewOutputString(line+reader.readerOptions.IFS, context)) } } - if stanza.dataLines.Len() > 0 { + if len(stanza.dataLines) > 0 { record, err := reader.recordFromXTABLines(stanza.dataLines) if err != nil { errorChannel <- err return } context.UpdateForInputRecord() - recordsAndContexts.PushBack(types.NewRecordAndContext(record, context)) + recordsAndContexts = append(recordsAndContexts, types.NewRecordAndContext(record, context)) } } @@ -271,14 +267,12 @@ func (reader *RecordReaderXTAB) getRecordBatch( } func (reader *RecordReaderXTAB) recordFromXTABLines( - stanza *list.List, + lines []string, ) (*mlrval.Mlrmap, error) { record := mlrval.NewMlrmapAsRecord() dedupeFieldNames := reader.readerOptions.DedupeFieldNames - for e := stanza.Front(); e != nil; e = e.Next() { - line := e.Value.(string) - + for _, line := range lines { key, value, err := reader.pairSplitter.Split(line) if err != nil { return nil, err diff --git a/pkg/output/channel_writer.go b/pkg/output/channel_writer.go index 86be3324a..7f078f1f8 100644 --- a/pkg/output/channel_writer.go +++ b/pkg/output/channel_writer.go @@ -2,7 +2,6 @@ package output import ( "bufio" - "container/list" "fmt" "os" @@ -11,7 +10,7 @@ import ( ) func ChannelWriter( - writerChannel <-chan *list.List, // list of *types.RecordAndContext + writerChannel <-chan []*types.RecordAndContext, recordWriter IRecordWriter, writerOptions *cli.TWriterOptions, doneChannel chan<- bool, @@ -45,15 +44,14 @@ func ChannelWriter( // TODO: comment // Returns true on end of record stream func channelWriterHandleBatch( - recordsAndContexts *list.List, + recordsAndContexts []*types.RecordAndContext, recordWriter IRecordWriter, writerOptions *cli.TWriterOptions, dataProcessingErrorChannel chan<- bool, bufferedOutputStream *bufio.Writer, outputIsStdout bool, ) (done bool, errored bool) { - for e := recordsAndContexts.Front(); e != nil; e = e.Next() { - recordAndContext := e.Value.(*types.RecordAndContext) + for _, recordAndContext := range recordsAndContexts { // Three things can come through: // * End-of-stream marker diff --git a/pkg/output/file_output_handlers.go b/pkg/output/file_output_handlers.go index 31f6b89a0..7f92e6c23 100644 --- a/pkg/output/file_output_handlers.go +++ b/pkg/output/file_output_handlers.go @@ -14,7 +14,6 @@ package output import ( "bufio" - "container/list" "errors" "fmt" "io" @@ -216,7 +215,7 @@ type FileOutputHandler struct { // print and dump variants call WriteString. recordWriterOptions *cli.TWriterOptions recordWriter IRecordWriter - recordOutputChannel chan *list.List // list of *types.RecordAndContext + recordOutputChannel chan []*types.RecordAndContext recordDoneChannel chan bool recordErroredChannel chan bool } @@ -352,8 +351,7 @@ func (handler *FileOutputHandler) WriteRecordAndContext( } // TODO: myybe refactor to batch better - ell := list.New() - ell.PushBack(outrecAndContext) + ell := []*types.RecordAndContext{outrecAndContext} handler.recordOutputChannel <- ell return nil } @@ -369,7 +367,7 @@ func (handler *FileOutputHandler) setUpRecordWriter() error { } handler.recordWriter = recordWriter - handler.recordOutputChannel = make(chan *list.List, 1) // list of *types.RecordAndContext + handler.recordOutputChannel = make(chan []*types.RecordAndContext, 1) handler.recordDoneChannel = make(chan bool, 1) handler.recordErroredChannel = make(chan bool, 1) diff --git a/pkg/types/context.go b/pkg/types/context.go index 6f82bc527..6bd0ee365 100644 --- a/pkg/types/context.go +++ b/pkg/types/context.go @@ -2,7 +2,6 @@ package types import ( "bytes" - "container/list" "strconv" "github.com/johnkerl/miller/v6/pkg/mlrval" @@ -82,11 +81,10 @@ func NewEndOfStreamMarker(context *Context) *RecordAndContext { } } -// TODO: comment // For the record-readers to update their initial context as each new record is read. -func NewEndOfStreamMarkerList(context *Context) *list.List { - ell := list.New() - ell.PushBack(NewEndOfStreamMarker(context)) +func NewEndOfStreamMarkerList(context *Context) []*RecordAndContext { + ell := make([]*RecordAndContext, 1) + ell[0] = NewEndOfStreamMarker(context) return ell }