|
@@ -5,14 +5,18 @@
|
|
|
package traffic
|
|
|
|
|
|
import (
|
|
|
+ "bufio"
|
|
|
+ "context"
|
|
|
"encoding/csv"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
+ "log"
|
|
|
"os"
|
|
|
"path/filepath"
|
|
|
"strings"
|
|
|
|
|
|
"git.sr.ht/~sircmpwn/go-bare"
|
|
|
+ "github.com/lanrat/extsort"
|
|
|
)
|
|
|
|
|
|
func readInputRoutesIndex(c feedConverter) (feedConverter, error) {
|
|
@@ -77,6 +81,7 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
|
|
|
}
|
|
|
defer tripsFile.Close()
|
|
|
|
|
|
+ // TODO unnecessary overhead, should parse single csv header line
|
|
|
trips := csv.NewReader(tripsFile)
|
|
|
tripsHeader, err := trips.Read()
|
|
|
if err != nil {
|
|
@@ -92,10 +97,11 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
|
|
|
if err != nil {
|
|
|
return c, fmt.Errorf("while opening stops file: %w", err)
|
|
|
}
|
|
|
- defer tripsFile.Close()
|
|
|
+ defer stopsFile.Close()
|
|
|
|
|
|
- stops := csv.NewReader(tripsFile)
|
|
|
- stopsHeader, err := trips.Read()
|
|
|
+ // TODO unnecessary overhead, should parse single csv header line
|
|
|
+ stops := csv.NewReader(stopsFile)
|
|
|
+ stopsHeader, err := stops.Read()
|
|
|
if err != nil {
|
|
|
return c, fmt.Errorf("while reading stops header: %w", err)
|
|
|
}
|
|
@@ -109,10 +115,11 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
|
|
|
if err != nil {
|
|
|
return c, fmt.Errorf("while opening routes file: %w", err)
|
|
|
}
|
|
|
- defer tripsFile.Close()
|
|
|
+ defer routesFile.Close()
|
|
|
|
|
|
- routes := csv.NewReader(tripsFile)
|
|
|
- routesHeader, err := trips.Read()
|
|
|
+ // TODO unnecessary overhead, should parse single csv header line
|
|
|
+ routes := csv.NewReader(routesFile)
|
|
|
+ routesHeader, err := routes.Read()
|
|
|
if err != nil {
|
|
|
return c, fmt.Errorf("while reading routes header: %w", err)
|
|
|
}
|
|
@@ -147,14 +154,12 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
|
|
|
stopID := record[fields["stop_id"]]
|
|
|
|
|
|
if previousTrip != tripID && previousTrip != "" {
|
|
|
- tripsFile.Seek(c.tripsInputIndex[tripID], 0)
|
|
|
- tripRecord, err := trips.Read()
|
|
|
+ tripRecord, err := readCsvLine(tripsFile, c.tripsInputIndex[tripID], len(tripsHeader))
|
|
|
if err != nil && err != io.EOF {
|
|
|
return fmt.Errorf("while reading a trips record: %w", err)
|
|
|
}
|
|
|
|
|
|
- routesFile.Seek(c.routesInputIndex[tripID], 0)
|
|
|
- routeRecord, err := routes.Read()
|
|
|
+ routeRecord, err := readCsvLine(routesFile, c.routesInputIndex[tripID], len(routesHeader))
|
|
|
if err != nil && err != io.EOF {
|
|
|
return fmt.Errorf("while reading a routes record: %w", err)
|
|
|
}
|
|
@@ -164,10 +169,9 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
|
|
|
case HeadsignTripHeadsing:
|
|
|
trip.Headsign = record[fields["trip_headsign"]]
|
|
|
case HeadsignTripLastStop:
|
|
|
- stopsFile.Seek(c.stopsInputIndex[stopID], 0)
|
|
|
- stopRecord, err := stops.Read()
|
|
|
+ stopRecord, err := readCsvLine(stopsFile, c.stopsInputIndex[stopID], len(stopsHeader))
|
|
|
if err != nil && err != io.EOF {
|
|
|
- return fmt.Errorf("while reading a trips record: %w", err)
|
|
|
+ return fmt.Errorf("while reading a stops record: %w", err)
|
|
|
}
|
|
|
|
|
|
trip.Headsign = stopRecord[stopsFields["stop_name"]]
|
|
@@ -243,7 +247,94 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
|
|
|
})
|
|
|
|
|
|
c.tripsOffsets = tripsOffsets
|
|
|
- // TODO after everything external-sort changeoptions.csv
|
|
|
- // TODO after everything external-sort tripsthroughstop.csv (e.g. https://github.com/lanrat/extsort)
|
|
|
return c, err
|
|
|
}
|
|
|
+
|
|
|
+func sortOutIndex(fileName string) error {
|
|
|
+ file, err := os.Open(fileName)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("while opening file: %w", err)
|
|
|
+ }
|
|
|
+ defer file.Close()
|
|
|
+
|
|
|
+ result, err := os.Create(fileName + "2")
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("while creating file: %w", err)
|
|
|
+ }
|
|
|
+ defer result.Close()
|
|
|
+
|
|
|
+ scanner := bufio.NewScanner(file)
|
|
|
+ scanner.Scan()
|
|
|
+ headerLine := scanner.Text()
|
|
|
+
|
|
|
+ if err := scanner.Err(); err != nil {
|
|
|
+ return fmt.Errorf("while scanning: %w", err)
|
|
|
+ }
|
|
|
+ inputChan := make(chan string)
|
|
|
+ go func() {
|
|
|
+ for scanner.Scan() {
|
|
|
+ inputChan <- scanner.Text()
|
|
|
+ }
|
|
|
+ close(inputChan)
|
|
|
+ }()
|
|
|
+
|
|
|
+ sorter, outputChan, errChan := extsort.Strings(inputChan, nil)
|
|
|
+ sorter.Sort(context.Background())
|
|
|
+
|
|
|
+ result.WriteString(headerLine + "\n")
|
|
|
+ for data := range outputChan {
|
|
|
+ result.WriteString(data + "\n")
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := <-errChan; err != nil {
|
|
|
+ return fmt.Errorf("while sorting: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ result.Close()
|
|
|
+ err = os.Rename(fileName+"2", fileName)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("while replacing file: %w", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func sortChangeOptions(c feedConverter) error {
|
|
|
+ return sortOutIndex(filepath.Join(c.TmpFeedPath, "changeoptions.csv"))
|
|
|
+}
|
|
|
+
|
|
|
+func sortTripsThroughStop(c feedConverter) error {
|
|
|
+ return sortOutIndex(filepath.Join(c.TmpFeedPath, "tripsthroughstop.csv"))
|
|
|
+}
|
|
|
+
|
|
|
+// TODO out to separate file
|
|
|
+func readCsvLine(r *os.File, offset int64, fields int) ([]string, error) {
|
|
|
+ if offset != -1 {
|
|
|
+ r.Seek(offset, 0)
|
|
|
+ }
|
|
|
+ line := []byte{}
|
|
|
+ for {
|
|
|
+ b := make([]byte, 1)
|
|
|
+ _, err := r.Read(b)
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ return []string{}, fmt.Errorf("while reading byte: %w", err)
|
|
|
+ }
|
|
|
+ if b[0] == '\n' {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ line = append(line, b[0])
|
|
|
+ }
|
|
|
+ // TODO unnecessary overhead, should parse single csv line, expecting $fields fields
|
|
|
+ csvReader := csv.NewReader(strings.NewReader(string(line)))
|
|
|
+ csvReader.FieldsPerRecord = fields
|
|
|
+ record, err := csvReader.Read()
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("fields: %d\nline: %s\nrecord:%v\n", fields, string(line), record)
|
|
|
+ return record, fmt.Errorf("while reading record: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return record, nil
|
|
|
+}
|