2 Коміти d4de60141a ... 89e31e7d2f

Автор SHA1 Опис Дата
  Adam 89e31e7d2f convert stops with less RAM 3 місяців тому
  Adam d4de60141a convert stops with less RAM 3 місяців тому
5 змінених файлів з 148 додано та 38 видалено
  1. 2 0
      go.mod
  2. 3 0
      go.sum
  3. 13 3
      traffic/convert.go
  4. 106 15
      traffic/convert_departures.go
  5. 24 20
      traffic/convert_stops.go

+ 2 - 0
go.mod

@@ -14,6 +14,7 @@ require (
 	github.com/adrg/strutil v0.3.0
 	github.com/cjoudrey/gluahttp v0.0.0-20201111170219-25003d9adfa9
 	github.com/dhconnelly/rtreego v1.1.0
+	github.com/lanrat/extsort v1.0.2
 	github.com/sahilm/fuzzy v0.1.0
 	github.com/ulikunitz/xz v0.5.10
 	github.com/yuin/gopher-lua v1.1.1
@@ -31,4 +32,5 @@ require (
 	github.com/kylelemons/godebug v1.1.0 // indirect
 	github.com/stretchr/testify v1.8.0 // indirect
 	golang.org/x/net v0.19.0 // indirect
+	golang.org/x/sync v0.1.0 // indirect
 )

+ 3 - 0
go.sum

@@ -30,6 +30,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
+github.com/lanrat/extsort v1.0.2 h1:p3MLVpQEPwEGPzeLBb+1eSErzRl6Bgjgr+qnIs2RxrU=
+github.com/lanrat/extsort v1.0.2/go.mod h1:ivzsdLm8Tv+88qbdpMElV6Z15StlzPUtZSKsGb51hnQ=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -71,6 +73,7 @@ golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
 golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
 golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

+ 13 - 3
traffic/convert.go

@@ -270,14 +270,14 @@ func getGtfsFiles(input ...interface{}) (interface{}, error) {
 	names := []string{}
 	for i, version := range args.missingVersions {
 		name := fmt.Sprintf("%s_%d.zip", version.String(), i)
-		zipPath := filepath.Join(args.tmpFeedPath, name)
+		//zipPath := filepath.Join(args.tmpFeedPath, name)
 		url := version.Link
 		request, err := http.NewRequest("GET", url, nil)
 		if err != nil {
 			return gott.Tuple{args}, fmt.Errorf("while creating request for %s: %w", url, err)
 		}
 		request.Header.Add("If-None-Match", args.etags[url])
-		client := http.Client{} // todo timeout
+		/*client := http.Client{} // todo timeout
 		response, err := client.Do(request)
 		if err != nil {
 			return gott.Tuple{args}, fmt.Errorf("while downloading gtfs %s %w", name, err)
@@ -299,7 +299,7 @@ func getGtfsFiles(input ...interface{}) (interface{}, error) {
 		_, err = io.Copy(file, response.Body)
 		if err != nil {
 			return gott.Tuple{args}, fmt.Errorf("while copying gtfs %s %w", name, err)
-		}
+		}*/
 		names = append(names, name)
 	}
 	args.gtfsFilenames = names
@@ -1343,12 +1343,22 @@ func convert(input ...interface{}) (interface{}, error) {
 			Bind(readInputStopsIndex).
 			Bind(readInputRoutesIndex).
 			Bind(convertDepartures).
+			Map(dropInputRoutesIndex).
+			Map(dropInputStopsIndex).
+			Map(dropInputTripsIndex).
 			// ---
+			Tee(sortChangeOptions).
+			Tee(sortTripsThroughStop).
+			Bind(readChangeOptionsIndex).
+			Bind(readTripsThroughStopsIndex).
 			Bind(convertStops).
+			Map(dropInputRoutesIndex).
+			Map(dropInputStopsIndex).
 			Tee(writeTripIndex).
 			Map(clearTripOffsets).
 			Tee(writeStopNameIndex).
 			Tee(writeStopCodeIndex).
+			// ---
 			Bind(getTrips).
 			Bind(convertLineGraphs).
 			Map(clearStops).

+ 106 - 15
traffic/convert_departures.go

@@ -5,14 +5,18 @@
 package traffic
 
 import (
+	"bufio"
+	"context"
 	"encoding/csv"
 	"fmt"
 	"io"
+	"log"
 	"os"
 	"path/filepath"
 	"strings"
 
 	"git.sr.ht/~sircmpwn/go-bare"
+	"github.com/lanrat/extsort"
 )
 
 func readInputRoutesIndex(c feedConverter) (feedConverter, error) {
@@ -77,6 +81,7 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
 	}
 	defer tripsFile.Close()
 
+	// TODO unnecessary overhead, should parse single csv header line
 	trips := csv.NewReader(tripsFile)
 	tripsHeader, err := trips.Read()
 	if err != nil {
@@ -92,10 +97,11 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
 	if err != nil {
 		return c, fmt.Errorf("while opening stops file: %w", err)
 	}
-	defer tripsFile.Close()
+	defer stopsFile.Close()
 
-	stops := csv.NewReader(tripsFile)
-	stopsHeader, err := trips.Read()
+	// TODO unnecessary overhead, should parse single csv header line
+	stops := csv.NewReader(stopsFile)
+	stopsHeader, err := stops.Read()
 	if err != nil {
 		return c, fmt.Errorf("while reading stops header: %w", err)
 	}
@@ -109,10 +115,11 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
 	if err != nil {
 		return c, fmt.Errorf("while opening routes file: %w", err)
 	}
-	defer tripsFile.Close()
+	defer routesFile.Close()
 
-	routes := csv.NewReader(tripsFile)
-	routesHeader, err := trips.Read()
+	// TODO unnecessary overhead, should parse single csv header line
+	routes := csv.NewReader(routesFile)
+	routesHeader, err := routes.Read()
 	if err != nil {
 		return c, fmt.Errorf("while reading routes header: %w", err)
 	}
@@ -147,14 +154,12 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
 		stopID := record[fields["stop_id"]]
 
 		if previousTrip != tripID && previousTrip != "" {
-			tripsFile.Seek(c.tripsInputIndex[tripID], 0)
-			tripRecord, err := trips.Read()
+			tripRecord, err := readCsvLine(tripsFile, c.tripsInputIndex[tripID], len(tripsHeader))
 			if err != nil && err != io.EOF {
 				return fmt.Errorf("while reading a trips record: %w", err)
 			}
 
-			routesFile.Seek(c.routesInputIndex[tripID], 0)
-			routeRecord, err := routes.Read()
+			routeRecord, err := readCsvLine(routesFile, c.routesInputIndex[tripID], len(routesHeader))
 			if err != nil && err != io.EOF {
 				return fmt.Errorf("while reading a routes record: %w", err)
 			}
@@ -164,10 +169,9 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
 			case HeadsignTripHeadsing:
 				trip.Headsign = record[fields["trip_headsign"]]
 			case HeadsignTripLastStop:
-				stopsFile.Seek(c.stopsInputIndex[stopID], 0)
-				stopRecord, err := stops.Read()
+				stopRecord, err := readCsvLine(stopsFile, c.stopsInputIndex[stopID], len(stopsHeader))
 				if err != nil && err != io.EOF {
-					return fmt.Errorf("while reading a trips record: %w", err)
+					return fmt.Errorf("while reading a stops record: %w", err)
 				}
 
 				trip.Headsign = stopRecord[stopsFields["stop_name"]]
@@ -243,7 +247,94 @@ func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_tim
 	})
 
 	c.tripsOffsets = tripsOffsets
-	// TODO after everything external-sort changeoptions.csv
-	// TODO after everything external-sort tripsthroughstop.csv (e.g. https://github.com/lanrat/extsort)
 	return c, err
 }
+
+func sortOutIndex(fileName string) error {
+	file, err := os.Open(fileName)
+	if err != nil {
+		return fmt.Errorf("while opening file: %w", err)
+	}
+	defer file.Close()
+
+	result, err := os.Create(fileName + "2")
+	if err != nil {
+		return fmt.Errorf("while creating file: %w", err)
+	}
+	defer result.Close()
+
+	scanner := bufio.NewScanner(file)
+	scanner.Scan()
+	headerLine := scanner.Text()
+
+	if err := scanner.Err(); err != nil {
+		return fmt.Errorf("while scanning: %w", err)
+	}
+	inputChan := make(chan string)
+	go func() {
+		for scanner.Scan() {
+			inputChan <- scanner.Text()
+		}
+		close(inputChan)
+	}()
+
+	sorter, outputChan, errChan := extsort.Strings(inputChan, nil)
+	sorter.Sort(context.Background())
+
+	result.WriteString(headerLine + "\n")
+	for data := range outputChan {
+		result.WriteString(data + "\n")
+	}
+
+	if err := <-errChan; err != nil {
+		return fmt.Errorf("while sorting: %w", err)
+	}
+
+	result.Close()
+	err = os.Rename(fileName+"2", fileName)
+	if err != nil {
+		return fmt.Errorf("while replacing file: %w", err)
+	}
+	return nil
+
+}
+
+func sortChangeOptions(c feedConverter) error {
+	return sortOutIndex(filepath.Join(c.TmpFeedPath, "changeoptions.csv"))
+}
+
+func sortTripsThroughStop(c feedConverter) error {
+	return sortOutIndex(filepath.Join(c.TmpFeedPath, "tripsthroughstop.csv"))
+}
+
+// TODO out to separate file
+func readCsvLine(r *os.File, offset int64, fields int) ([]string, error) {
+	if offset != -1 {
+		r.Seek(offset, 0)
+	}
+	line := []byte{}
+	for {
+		b := make([]byte, 1)
+		_, err := r.Read(b)
+		if err != nil {
+			if err == io.EOF {
+				break
+			}
+			return []string{}, fmt.Errorf("while reading byte: %w", err)
+		}
+		if b[0] == '\n' {
+			break
+		}
+		line = append(line, b[0])
+	}
+	// TODO unnecessary overhead, should parse single csv line, expecting $fields fields
+	csvReader := csv.NewReader(strings.NewReader(string(line)))
+	csvReader.FieldsPerRecord = fields
+	record, err := csvReader.Read()
+	if err != nil {
+		log.Printf("fields: %d\nline: %s\nrecord:%v\n", fields, string(line), record)
+		return record, fmt.Errorf("while reading record: %w", err)
+	}
+
+	return record, nil
+}

+ 24 - 20
traffic/convert_stops.go

@@ -120,28 +120,31 @@ func convertStops(c feedConverter) (feedConverter, error) { // O(n:stops) ; (tra
 		stopID := record[fields["stop_id"]]
 
 		stopTrips := map[string]StopOrder{}
-		tripsThroughStopFile.Seek(c.stopsInputIndex[stopID], 0) // TODO may not exist
-		for {
-			tripsThroughStopRecord, err := tripsThroughStop.Read()
-			if err != nil {
-				return fmt.Errorf("while reading tripsThroughStop record: %w", err)
-			}
-			recordStopID := tripsThroughStopRecord[tripsThroughStopFields["stop_id"]]
-			if stopID != recordStopID {
-				break
+		if position, ok := c.stopsInputIndex[stopID]; ok {
+			tripsThroughStopFile.Seek(position, 0)
+			for {
+				// FIXME wrong number of fields
+				tripsThroughStopRecord, err := readCsvLine(tripsThroughStopFile, -1, len(tripsThroughStopHeader))
+				if err != nil {
+					return fmt.Errorf("while reading tripsThroughStop record: %w", err)
+				}
+				recordStopID := tripsThroughStopRecord[tripsThroughStopFields["stop_id"]]
+				if stopID != recordStopID {
+					break
+				}
+				tripID := tripsThroughStopRecord[tripsThroughStopFields["trip_id"]]
+				var sequence int
+				fmt.Sscanf(tripsThroughStopRecord[tripsThroughStopFields["sequence"]], "%d", &sequence)
+				stopTrips[tripID] = StopOrder{
+					Sequence:   sequence,
+					TripOffset: c.tripsOffsets[tripID],
+				}
 			}
-			tripID := tripsThroughStopRecord[tripsThroughStopFields["trip_id"]]
-			var sequence int
-			fmt.Sscanf(tripsThroughStopRecord[tripsThroughStopFields["sequence"]], "%d", &sequence)
-			stopTrips[tripID] = StopOrder{
-				Sequence:   sequence,
-				TripOffset: c.tripsOffsets[tripID],
+			stopTripsLength := len(stopTrips)
+			if maxStopTripsLength < stopTripsLength {
+				maxStopTripsLength = stopTripsLength
 			}
 		}
-		stopTripsLength := len(stopTrips)
-		if maxStopTripsLength < stopTripsLength {
-			maxStopTripsLength = stopTripsLength
-		}
 
 		stop.Id = stopID
 
@@ -193,8 +196,9 @@ func convertStops(c feedConverter) (feedConverter, error) { // O(n:stops) ; (tra
 		stop.ChangeOptions = []ChangeOption{}
 		stop.Order = stopTrips
 
+		changeOptionsFile.Seek(c.routesInputIndex[stopID], 0)
 		for {
-			changeOptionsRecord, err := changeOptions.Read()
+			changeOptionsRecord, err := readCsvLine(changeOptionsFile, -1, len(changeOptionsHeader))
 			if err != nil {
 				return fmt.Errorf("while reading changeOptions record: %w", err)
 			}