2 Achegas b6f08c3689 ... a9e2136f74

Autor SHA1 Mensaxe Data
  Adam Evyčędo a9e2136f74 convert departures with less RAM usage hai 2 meses
  Adam Evyčędo df49752293 add input indices for converting hai 2 meses
Modificáronse 2 ficheiros con 273 adicións e 267 borrados
  1. 40 267
      traffic/convert.go
  2. 233 0
      traffic/convert_departures.go

+ 40 - 267
traffic/convert.go

@@ -82,29 +82,27 @@ type feedConverter struct {
 
 	Timezone            *time.Location
 	TrafficCalendarFile *os.File
-	Departures          map[string][]Departure
-	TripsThroughStop    map[string]map[string]StopOrder
-	LineNames           map[string]string
-	TripsOffsets        map[string]uint
-	TripChangeOpts      map[string]ChangeOption
-	StopsCodeIndex      CodeIndex
-	StopsNameIndex      map[string][]uint
-	Stops               map[string]string
-	LineGraphs          map[string]map[uint]LineGraph
-	lineHeadsigns       map[string]map[uint][]string
-	LineIndex           map[string][]uint
-	LineIdIndex         CodeIndex
-	ValidFrom           time.Time
-	ValidFromError      []error
-	ValidTill           time.Time
-	ValidTillError      []error
-	tripHeadsigns       map[string]string
-	stopNames           map[string]string
-	feedInfo            FeedInfo
-	defaultLanguage     string
-	translations        map[string]map[string]string
-	schedules           map[string]Schedule
-	trips               map[string]Trip
+	tripsInputIndex     map[string]int64
+	routesInputIndex    map[string]int64
+	stopsInputIndex     map[string]int64
+	tripsOffsets        map[string]uint
+
+	StopsCodeIndex  CodeIndex
+	StopsNameIndex  map[string][]uint
+	Stops           map[string]string
+	LineGraphs      map[string]map[uint]LineGraph
+	lineHeadsigns   map[string]map[uint][]string
+	LineIndex       map[string][]uint
+	LineIdIndex     CodeIndex
+	ValidFrom       time.Time
+	ValidFromError  []error
+	ValidTill       time.Time
+	ValidTillError  []error
+	feedInfo        FeedInfo
+	defaultLanguage string
+	translations    map[string]map[string]string
+	schedules       map[string]Schedule
+	trips           map[string]Trip
 }
 
 // helper functions
@@ -616,257 +614,39 @@ func closeTrafficCalendarFile(c feedConverter, e error) (feedConverter, error) {
 	return c, e
 }
 
-func clearDepartures(c feedConverter) feedConverter {
-	c.Departures = map[string][]Departure{}
-	return c
-}
-
-func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; ( -- departures:map[tripID][]departure, tripsThroughStop:map[stopID][]{tripID,order}, tripHeadsigns:map[tripID]stopID >> )
-	path := c.TmpFeedPath
-
-	file, err := os.Open(filepath.Join(path, "stop_times.txt"))
+func forEachRow(filename string, f func(int64, map[string]int, []string) error) error {
+	file, err := os.Open(filename)
 	if err != nil {
-		return c, fmt.Errorf("while opening file: %w", err)
+		return fmt.Errorf("while opening file: %w", err)
 	}
 	defer file.Close()
 
-	departures := map[string][]Departure{}
-
-	r := csv.NewReader(bufio.NewReader(file))
+	r := csv.NewReader(file)
 	header, err := r.Read()
 	if err != nil {
-		return c, fmt.Errorf("while reading header: %w", err)
+		return fmt.Errorf("while reading header: %w", err)
 	}
-	fields := map[string]int{}
-	for i, headerField := range header {
-		fields[headerField] = i
-	}
-
-	tripsThroughStop := map[string]map[string]StopOrder{}
-	tripHeadsigns := map[string]string{}
-
-	for {
-		departure := Departure{}
-		record, err := r.Read()
-		if err == io.EOF {
-			break
-		}
-		if err != nil {
-			return c, fmt.Errorf("while reading a record: %w", err)
-		}
-
-		stopID := record[fields["stop_id"]]
-
-		tripID := record[fields["trip_id"]]
-		fmt.Sscanf(record[fields["stop_sequence"]], "%d", &departure.StopSequence)
-		fmt.Sscanf(record[fields["pickup_type"]], "%d", &departure.Pickup)
-		fmt.Sscanf(record[fields["drop_off_type"]], "%d", &departure.Dropoff)
-
-		if _, ok := tripsThroughStop[stopID]; !ok {
-			tripsThroughStop[stopID] = map[string]StopOrder{}
-		}
-		tripsThroughStop[stopID][tripID] = StopOrder{
-			Sequence: departure.StopSequence,
-		}
-
-		if c.Feed.Flags().Headsign == HeadsignTripLastStop {
-			tripHeadsigns[tripID] = stopID
-		}
-
-		var hours, minutes uint
-		fmt.Sscanf(record[fields["arrival_time"]], "%d:%d", &hours, &minutes)
-		departure.Time = hours*60 + minutes
-
-		departures[tripID] = append(departures[tripID], departure)
-	}
-
-	c.tripHeadsigns = tripHeadsigns
-	c.Departures = departures
-	c.TripsThroughStop = tripsThroughStop
-	return c, nil
-}
-
-func clearLineNames(c feedConverter) feedConverter {
-	c.LineNames = map[string]string{}
-	return c
-}
-
-func getLineNames(c feedConverter) (feedConverter, error) { // O(n:routes) ; ( -- lineNames:map[routeID]lineName >> )
-	path := c.TmpFeedPath
 
-	file, err := os.Open(filepath.Join(path, "routes.txt"))
-	if err != nil {
-		return c, fmt.Errorf("while opening file: %w", err)
-	}
-	defer file.Close()
-	r := csv.NewReader(bufio.NewReader(file))
-	header, err := r.Read()
-	if err != nil {
-		return c, fmt.Errorf("while reading header: %w", err)
-	}
 	fields := map[string]int{}
 	for i, headerField := range header {
 		fields[headerField] = i
 	}
 
-	names := map[string]string{}
-
 	for {
+		offset := r.InputOffset()
 		record, err := r.Read()
 		if err == io.EOF {
 			break
 		}
 		if err != nil {
-			return c, fmt.Errorf("while reading a record: %w", err)
-		}
-
-		routeID := record[fields["route_id"]]
-		lineName := c.Feed.Flags().LineName
-		for _, template := range []string{"route_short_name", "route_long_name"} {
-			lineName = strings.Replace(lineName, "{{"+template+"}}", record[fields[template]], -1)
-		}
-		names[routeID] = lineName
-	}
-
-	c.LineNames = names
-	return c, nil
-}
-
-func clearStopNames(c feedConverter) feedConverter {
-	c.stopNames = map[string]string{}
-	return c
-}
-
-func getStopNames(c feedConverter) (feedConverter, error) { // O(n:stops) ; ( -- stopNames[stopID]stopName >> )
-	if c.Feed.Flags().Headsign != HeadsignTripLastStop {
-		return c, nil
-	}
-
-	stopNames := map[string]string{}
-
-	path := c.TmpFeedPath
-
-	file, err := os.Open(filepath.Join(path, "stops.txt"))
-	if err != nil {
-		return c, fmt.Errorf("while opening file: %w", err)
-	}
-	defer file.Close()
-
-	r := csv.NewReader(bufio.NewReader(file))
-	header, err := r.Read()
-	if err != nil {
-		return c, fmt.Errorf("while reading header: %w", err)
-	}
-	fields := map[string]int{}
-	for i, headerField := range header {
-		fields[headerField] = i
-	}
-
-	for {
-		record, err := r.Read()
-		if err == io.EOF {
-			break
+			return fmt.Errorf("while reading a record: %w", err)
 		}
+		err = f(offset, fields, record)
 		if err != nil {
-			return c, fmt.Errorf("while reading a record: %w", err)
+			return fmt.Errorf("while performing function: %w", err)
 		}
-
-		stopID := record[fields["stop_id"]]
-		stopName := record[fields["stop_name"]]
-		stopNames[stopID] = stopName
 	}
-
-	c.stopNames = stopNames
-
-	return c, nil
-}
-
-func clearTripsChangeOptions(c feedConverter) feedConverter {
-	c.TripChangeOpts = map[string]ChangeOption{}
-	return c
-}
-
-func clearTripsThroughStops(c feedConverter) feedConverter {
-	c.TripsThroughStop = map[string]map[string]StopOrder{}
-	return c
-}
-
-func convertTrips(c feedConverter) (feedConverter, error) { // O(n:trips) ; (departures, lineNames, stopNames -- tripsOffsets:map[tripID]offset, tripsChangeOpts:map[tripID]{lineID,headsign} >> trips)
-	path := c.TmpFeedPath
-	departures := c.Departures
-	lineNames := c.LineNames
-
-	file, err := os.Open(filepath.Join(path, "trips.txt"))
-	if err != nil {
-		return c, fmt.Errorf("while opening file: %w", err)
-	}
-	defer file.Close()
-
-	result, err := os.Create(filepath.Join(path, "trips.bare"))
-	if err != nil {
-		return c, fmt.Errorf("while creating file: %w", err)
-	}
-	defer result.Close()
-
-	r := csv.NewReader(bufio.NewReader(file))
-	header, err := r.Read()
-	if err != nil {
-		return c, fmt.Errorf("while reading header: %w", err)
-	}
-	fields := map[string]int{}
-	for i, headerField := range header {
-		fields[headerField] = i
-	}
-
-	var offset uint = 0
-	tripsOffsets := map[string]uint{}
-
-	tripChangeOpts := map[string]ChangeOption{}
-
-	for {
-		trip := Trip{}
-		record, err := r.Read()
-		if err == io.EOF {
-			break
-		}
-		if err != nil {
-			return c, fmt.Errorf("while reading a record: %w", err)
-		}
-
-		trip.Id = record[fields["trip_id"]]
-		switch c.Feed.Flags().Headsign {
-		case HeadsignTripHeadsing:
-			trip.Headsign = record[fields["trip_headsign"]]
-		case HeadsignTripLastStop:
-			trip.Headsign = c.stopNames[c.tripHeadsigns[trip.Id]]
-		}
-
-		trip.Departures = departures[trip.Id]
-		trip.ScheduleID = record[fields["service_id"]]
-		trip.LineID = record[fields["route_id"]]
-		fmt.Sscanf(record[fields["direction_id"]], "%d", &trip.Direction)
-
-		tripChangeOpts[trip.Id] = ChangeOption{
-			LineName:            lineNames[record[fields["route_id"]]],
-			Headsign:            translateFieldDefault(trip.Headsign, c.feedInfo.Language, c.defaultLanguage, c.translations),
-			TranslatedHeadsigns: translateField(trip.Headsign, c.feedInfo.Language, c.defaultLanguage, c.translations),
-		}
-
-		bytes, err := bare.Marshal(&trip)
-		if err != nil {
-			return c, fmt.Errorf("while marshalling: %w", err)
-		}
-		b, err := result.Write(bytes)
-		if err != nil {
-			return c, fmt.Errorf("while writing: %w", err)
-		}
-		tripsOffsets[trip.Id] = offset
-		offset += uint(b)
-	}
-
-	c.TripsOffsets = tripsOffsets
-	c.TripChangeOpts = tripChangeOpts
-	return c, nil
+	return nil
 }
 
 func clearStops(c feedConverter) feedConverter {
@@ -876,9 +656,7 @@ func clearStops(c feedConverter) feedConverter {
 
 func convertStops(c feedConverter) (feedConverter, error) { // O(n:stops) ; (translations, tripsThroughStop, tripChangeOpts, tripOffsets -- stopsOffsetsByCode:CodeIndex, stopsOffsetsByName:map[name][]offsets >> stops)
 	path := c.TmpFeedPath
-	tripsThroughStop := c.TripsThroughStop
-	tripChangeOpts := c.TripChangeOpts
-	tripsOffsets := c.TripsOffsets
+	tripsOffsets := c.tripsOffsets
 
 	file, err := os.Open(filepath.Join(path, "stops.txt"))
 	if err != nil {
@@ -1039,7 +817,7 @@ func convertStops(c feedConverter) (feedConverter, error) { // O(n:stops) ; (tra
 }
 
 func clearTripOffsets(c feedConverter) feedConverter {
-	c.TripsOffsets = map[string]uint{}
+	c.tripsOffsets = map[string]uint{}
 	return c
 }
 
@@ -1635,11 +1413,11 @@ func writeLineIdIndex(c feedConverter) error {
 
 func writeTripIndex(c feedConverter) error {
 	tripIndex := map[string][]uint{}
-	for trip, offset := range c.TripsOffsets {
+	for trip, offset := range c.tripsOffsets {
 		tripIndex[trip] = []uint{offset}
 	}
 	err := writeNameIndex(c, tripIndex, "ix_trips.bare", true)
-	c.TripsOffsets = map[string]uint{}
+	c.tripsOffsets = map[string]uint{}
 	return err
 }
 
@@ -1670,7 +1448,6 @@ func writeCodeIndex(c feedConverter, i CodeIndex, filename string) error {
 }
 
 func deleteTxtFiles(c feedConverter) error {
-	return nil
 	return file.DeleteTxtFiles(c.TmpFeedPath, c.GtfsFilename)
 }
 
@@ -1723,21 +1500,17 @@ func convert(input ...interface{}) (interface{}, error) {
 			Tee(saveSchedules).
 			Tee(saveFeedInfo).
 			Recover(closeTrafficCalendarFile).
+			// ---
+			Bind(readInputTripsIndex).
+			Bind(readInputStopsIndex).
+			Bind(readInputRoutesIndex).
 			Bind(convertDepartures).
-			Bind(getLineNames).
-			Bind(getStopNames).
-			Bind(convertTrips).
-			Map(clearDepartures).
-			Map(clearStopNames).
-			Map(clearLineNames).
+			// ---
 			Bind(convertStops).
 			Tee(writeTripIndex).
 			Map(clearTripOffsets).
 			Tee(writeStopNameIndex).
 			Tee(writeStopCodeIndex).
-			Map(clearTripsChangeOptions).
-			Map(clearTripsThroughStops).
-			Map(clearLineNames).
 			Bind(getTrips).
 			Bind(convertLineGraphs).
 			Map(clearStops).

+ 233 - 0
traffic/convert_departures.go

@@ -0,0 +1,233 @@
+// SPDX-FileCopyrightText: Adam Evyčędo
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+
+package traffic
+
+import (
+	"encoding/csv"
+	"fmt"
+	"io"
+	"os"
+	"path/filepath"
+	"strings"
+
+	"git.sr.ht/~sircmpwn/go-bare"
+)
+
+func readInputRoutesIndex(c feedConverter) (feedConverter, error) {
+	index := map[string]int64{}
+
+	path := c.TmpFeedPath
+	err := forEachRow(filepath.Join(path, "routes.txt"), func(offset int64, fields map[string]int, record []string) error {
+		routeID := record[fields["route_id"]]
+		index[routeID] = offset
+		return nil
+	})
+
+	c.routesInputIndex = index
+	return c, err
+}
+
+func readInputStopsIndex(c feedConverter) (feedConverter, error) {
+	index := map[string]int64{}
+
+	path := c.TmpFeedPath
+	forEachRow(filepath.Join(path, "stops.txt"), func(offset int64, fields map[string]int, record []string) error {
+		stopID := record[fields["stop_id"]]
+		index[stopID] = offset
+		return nil
+	})
+
+	c.stopsInputIndex = index
+	return c, nil
+}
+
+func readInputTripsIndex(c feedConverter) (feedConverter, error) {
+	index := map[string]int64{}
+
+	path := c.TmpFeedPath
+	err := forEachRow(filepath.Join(path, "trips.txt"), func(offset int64, fields map[string]int, record []string) error {
+		tripID := record[fields["trip_id"]]
+		index[tripID] = offset
+		return nil
+	})
+
+	c.tripsInputIndex = index
+	return c, err
+}
+
+func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; (TmpFeedPath, tripsInputIndex, stopsInputIndex -- TripsOffsets:map[tripID]offset >> trips)
+	path := c.TmpFeedPath
+
+	tripsOffsets := map[string]uint{}
+	var outputOffset uint = 0
+	previousTrip := ""
+	trip := Trip{}
+
+	result, err := os.Create(filepath.Join(path, "trips.bare"))
+	if err != nil {
+		return c, fmt.Errorf("while creating file: %w", err)
+	}
+	defer result.Close()
+
+	tripsFile, err := os.Open(filepath.Join(path, "trips.txt"))
+	if err != nil {
+		return c, fmt.Errorf("while opening trips file: %w", err)
+	}
+	defer tripsFile.Close()
+
+	trips := csv.NewReader(tripsFile)
+	tripsHeader, err := trips.Read()
+	if err != nil {
+		return c, fmt.Errorf("while reading trips header: %w", err)
+	}
+
+	tripsFields := map[string]int{}
+	for i, headerField := range tripsHeader {
+		tripsFields[headerField] = i
+	}
+
+	stopsFile, err := os.Open(filepath.Join(path, "stops.txt"))
+	if err != nil {
+		return c, fmt.Errorf("while opening stops file: %w", err)
+	}
+	defer tripsFile.Close()
+
+	stops := csv.NewReader(tripsFile)
+	stopsHeader, err := trips.Read()
+	if err != nil {
+		return c, fmt.Errorf("while reading stops header: %w", err)
+	}
+
+	stopsFields := map[string]int{}
+	for i, headerField := range stopsHeader {
+		stopsFields[headerField] = i
+	}
+
+	routesFile, err := os.Open(filepath.Join(path, "routes.txt"))
+	if err != nil {
+		return c, fmt.Errorf("while opening routes file: %w", err)
+	}
+	defer tripsFile.Close()
+
+	routes := csv.NewReader(tripsFile)
+	routesHeader, err := trips.Read()
+	if err != nil {
+		return c, fmt.Errorf("while reading routes header: %w", err)
+	}
+
+	routesFields := map[string]int{}
+	for i, headerField := range routesHeader {
+		routesFields[headerField] = i
+	}
+
+	changeOptionsFile, err := os.Create(filepath.Join(path, "changeoptions.csv"))
+	if err != nil {
+		return c, fmt.Errorf("while creating changeOptions file: %w", err)
+	}
+	defer changeOptionsFile.Close()
+	changeOptions := csv.NewWriter(changeOptionsFile)
+
+	tripsThroughStopFile, err := os.Create(filepath.Join(path, "tripsthroughstop.csv"))
+	if err != nil {
+		return c, fmt.Errorf("while creating tripsThroughStop file: %w", err)
+	}
+	defer tripsThroughStopFile.Close()
+	tripsThroughStop := csv.NewWriter(tripsThroughStopFile)
+
+	err = forEachRow(filepath.Join(path, "stop_times.txt"), func(offset int64, fields map[string]int, record []string) error {
+		departure := Departure{}
+		tripID := record[fields["trip_id"]]
+		stopID := record[fields["stop_id"]]
+
+		if previousTrip != tripID && previousTrip != "" {
+			tripsFile.Seek(c.tripsInputIndex[tripID], 0)
+			tripRecord, err := trips.Read()
+			if err != nil && err != io.EOF {
+				return fmt.Errorf("while reading a trips record: %w", err)
+			}
+
+			routesFile.Seek(c.routesInputIndex[tripID], 0)
+			routeRecord, err := routes.Read()
+			if err != nil && err != io.EOF {
+				return fmt.Errorf("while reading a routes record: %w", err)
+			}
+
+			trip.Id = tripID
+			switch c.Feed.Flags().Headsign {
+			case HeadsignTripHeadsing:
+				trip.Headsign = record[fields["trip_headsign"]]
+			case HeadsignTripLastStop:
+				stopsFile.Seek(c.stopsInputIndex[stopID], 0)
+				stopRecord, err := stops.Read()
+				if err != nil && err != io.EOF {
+					return fmt.Errorf("while reading a trips record: %w", err)
+				}
+
+				trip.Headsign = stopRecord[stopsFields["stop_name"]]
+			}
+			// TODO translated headsign(-s)
+
+			trip.ScheduleID = tripRecord[tripsFields["service_id"]]
+			trip.LineID = record[fields["route_id"]]
+			fmt.Sscanf(record[fields["direction_id"]], "%d", &trip.Direction)
+
+			bytes, err := bare.Marshal(&trip)
+			if err != nil {
+				return fmt.Errorf("while marshalling: %w", err)
+			}
+			b, err := result.Write(bytes)
+			if err != nil {
+				return fmt.Errorf("while writing: %w", err)
+			}
+			tripsOffsets[trip.Id] = outputOffset
+			outputOffset += uint(b)
+
+			lineName := c.Feed.Flags().LineName
+			for _, template := range []string{"route_short_name", "route_long_name"} {
+				lineName = strings.Replace(lineName, "{{"+template+"}}", routeRecord[routesFields[template]], -1)
+			}
+			headsign := translateFieldDefault(trip.Headsign, c.feedInfo.Language, c.defaultLanguage, c.translations)
+			translatedHeadsigns := translateField(trip.Headsign, c.feedInfo.Language, c.defaultLanguage, c.translations)
+			changeOptionsRecord := []string{stopID, routeRecord[routesFields["route_id"]], lineName, headsign}
+			for _, translatedHeadsign := range translatedHeadsigns {
+				changeOptionsRecord = append(changeOptionsRecord, translatedHeadsign.Language)
+				changeOptionsRecord = append(changeOptionsRecord, translatedHeadsign.Value)
+			}
+			err = changeOptions.Write(changeOptionsRecord)
+			if err != nil {
+				return fmt.Errorf("while writing changeOptions record: %w", err)
+			}
+
+			trip = Trip{}
+		}
+
+		fmt.Sscanf(record[fields["stop_sequence"]], "%d", &departure.StopSequence)
+		fmt.Sscanf(record[fields["pickup_type"]], "%d", &departure.Pickup)
+		fmt.Sscanf(record[fields["drop_off_type"]], "%d", &departure.Dropoff)
+
+		stopSequence := fmt.Sprintf("%d", departure.StopSequence)
+		tripsThroughStopRecord := []string{stopID, tripID, stopSequence}
+		err := tripsThroughStop.Write(tripsThroughStopRecord)
+		if err != nil {
+			return fmt.Errorf("while writing tripsThroughStop record: %w", err)
+		}
+
+		departureTime, err := parseDepartureTime(record[fields["arrival_time"]])
+		if err != nil {
+			return fmt.Errorf("while parsing arrival time: %w", err)
+		}
+		departure.Time = uint(departureTime)
+
+		trip.Departures = append(trip.Departures, departure)
+		previousTrip = tripID
+
+		return nil
+	})
+
+	c.tripsOffsets = tripsOffsets
+	// TODO after everything external-sort changeoptions.csv
+	// TODO after everything external-sort tripsthroughstop.csv (e.g. https://github.com/lanrat/extsort)
+	return c, err
+}