Author: Adam Evyčędo <git@apiote.xyz>
fixes for new RT
api/api.go | 9 +-- gtfs_rt/new.go | 2 traffic/access.go | 17 +++--- traffic/berlin_vbb.go | 4 traffic/brussels_stib_mivb.go | 4 traffic/convert.go | 3 traffic/feeds.go | 2 traffic/gzm_ztm.go | 89 ++---------------------------------- traffic/krakow_ztp.go | 10 ++-- traffic/poznan_ztm.go | 7 ++ traffic/realtime.go | 44 +++++++++-------- traffic/realtime_gtfs.go | 33 +++++++++----
diff --git a/api/api.go b/api/api.go index f3b950de9b401a75d64da8dc73582b8ddb74b993..0bf0bc8f46a8dfa1e171d1b1dd4705b3e0b999c1 100644 --- a/api/api.go +++ b/api/api.go @@ -151,11 +151,10 @@ line, err := traffic.GetLine(vehicle.LineName, context, t) if err != nil { return VehicleV1{}, fmt.Errorf("while getting line %s: %w", vehicle.LineName, err) } - log.Printf("convertTrafficVehicle:: trafficVehicle: %+v, line: %+v\n", vehicle, line) return VehicleV1{ Id: string(vehicle.VehicleID), Position: PositionV1{vehicle.Latitude, vehicle.Longitude}, - Capabilities: t.Vehicles[string(context.Version)][traffic.Validity(context.FeedID)][vehicle.VehicleID].Capabilities, + Capabilities: t.Vehicles[context.FeedID][context.Version][vehicle.VehicleID].Capabilities, Speed: vehicle.Speed, Line: LineStubV1{Name: line.Name, Kind: makeLineTypeV1(line), Colour: fromColor(line.Colour)}, Headsign: vehicle.Headsign, @@ -171,11 +170,10 @@ line, err := traffic.GetLine(vehicle.LineName, context, t) if err != nil { return VehicleV2{}, fmt.Errorf("while getting line %s: %w", vehicle.LineName, err) } - log.Printf("convertTrafficVehicle:: trafficVehicle: %+v, line: %+v\n", vehicle, line) return VehicleV2{ Id: string(vehicle.VehicleID), Position: PositionV1{vehicle.Latitude, vehicle.Longitude}, - Capabilities: t.Vehicles[string(context.Version)][traffic.Validity(context.FeedID)][vehicle.VehicleID].Capabilities, + Capabilities: t.Vehicles[context.FeedID][context.Version][vehicle.VehicleID].Capabilities, Speed: vehicle.Speed, Line: LineStubV2{Name: line.Name, Kind: makeLineTypeV2(line), Colour: fromColor(line.Colour)}, Headsign: vehicle.Headsign, @@ -191,11 +189,10 @@ line, err := traffic.GetLine(vehicle.LineName, context, t) if err != nil { return VehicleV3{}, fmt.Errorf("while getting line %s: %w", vehicle.LineName, err) } - log.Printf("convertTrafficVehicle:: trafficVehicle: %+v, line: %+v\n", vehicle, line) return VehicleV3{ Id: string(vehicle.VehicleID), Position: PositionV1{vehicle.Latitude, vehicle.Longitude}, - Capabilities: t.Vehicles[string(context.Version)][traffic.Validity(context.FeedID)][vehicle.VehicleID].Capabilities, + Capabilities: t.Vehicles[context.FeedID][context.Version][vehicle.VehicleID].Capabilities, Speed: vehicle.Speed, Line: LineStubV3{Name: line.Name, Kind: makeLineTypeV3(line), Colour: fromColor(line.Colour)}, Headsign: vehicle.Headsign, diff --git a/gtfs_rt/new.go b/gtfs_rt/new.go index 78e4cf47c04f4553172808a6f44e0dff0619149b..a188d3d09c3a97e0570ebda411aadb76edc460dd 100644 --- a/gtfs_rt/new.go +++ b/gtfs_rt/new.go @@ -11,7 +11,7 @@ "google.golang.org/protobuf/proto" ) -func GetMessages2(feedID string, feedURL string) (*pb.FeedMessage, error) { +func GetMessages(feedID string, feedURL string) (*pb.FeedMessage, error) { var message *pb.FeedMessage client := http.Client{Timeout: 5 * time.Second} response, err := client.Get(feedURL) diff --git a/traffic/access.go b/traffic/access.go index 41fb32d22149592de5c645cbc07ed1cf1e673f80..6720c4de2981e1232f8f2d85dcecdff024dc11a7 100644 --- a/traffic/access.go +++ b/traffic/access.go @@ -272,11 +272,12 @@ result.Departures = departures return result, nil } -func makeDeparturesRealtime(input ...interface{}) interface{} { +func makeDeparturesRealtime(input ...interface{}) (interface{}, error) { result := input[0].(_Result) - result.Departures = enrichDepartures(result.Departures, result.Datetime, result.DeparturesType, result.Ctx, result.TripsFile, result.Location) + departures, err := enrichDepartures(result.Departures, result.Datetime, result.DeparturesType, result.Ctx, result.TripsFile, result.Location) result.TripsFile.Close() - return result + result.Departures = departures + return result, err } func getDeparture(date time.Time, result _Result, order StopOrder, @@ -703,6 +704,7 @@ DeparturesType: departuresType, Vehicles: vehicles, Feed: traffic.Feeds[ctx.FeedID], Ctx: ctx, + Traffic: traffic, } r, e := gott.NewResult(result). @@ -717,7 +719,7 @@ Bind(unmarshalStop). Bind(openTripsFile). Bind(readTrips). Bind(getDepartures). - Map(makeDeparturesRealtime). + Bind(makeDeparturesRealtime). Map(filterDepartures). Map(filterDeparturesByLine). Map(sortDepartures). @@ -811,7 +813,7 @@ } } func getLineByOffset(offset uint, dataHome string, feedName string, - versionCode Validity, traffic *Traffic) (Line, error) { + versionCode Validity) (Line, error) { result := _Result{ Filename: "lines.bare", Offset: offset, @@ -947,7 +949,7 @@ if err != nil { return Line{}, err } if o.Name == cleanedName { - return getLineByOffset(o.Offsets[0], context.DataHome, context.FeedID, context.Version, traffic) + return getLineByOffset(o.Offsets[0], context.DataHome, context.FeedID, context.Version) } } return Line{}, nil @@ -974,8 +976,7 @@ } results := fuzzy.FindFrom(cleanQuery, index) for _, result := range results { for _, offset := range index[result.Index].Offsets { - line, err := getLineByOffset(offset, dataHome, feedName, - versionCode, traffic) + line, err := getLineByOffset(offset, dataHome, feedName, versionCode) if err != nil { return lines, fmt.Errorf("while getting line for %s: %w", result.Str, err) } diff --git a/traffic/berlin_vbb.go b/traffic/berlin_vbb.go index 5a43b6e68e0b696d554f8d9ae95b22162ad90240..ad7096c96ec16c145faec13b7976b00615c1cccd 100644 --- a/traffic/berlin_vbb.go +++ b/traffic/berlin_vbb.go @@ -49,8 +49,8 @@ func (z VbbBerlin) String() string { return "berlin_vbb" } -func (z VbbBerlin) RealtimeFeeds() []string { - return []string{} +func (z VbbBerlin) RealtimeFeeds() map[RealtimeFeedType]string { + return map[RealtimeFeedType]string{} } func (z VbbBerlin) Transformer() transform.Transformer { diff --git a/traffic/brussels_stib_mivb.go b/traffic/brussels_stib_mivb.go index 349a5e11bfeb38a2550ab40a4c2b96f01a2f0d30..c13f79be2eb874750f2a161e9df7c61e96bf5a15 100644 --- a/traffic/brussels_stib_mivb.go +++ b/traffic/brussels_stib_mivb.go @@ -59,8 +59,8 @@ func (z StibMivbBrussels) String() string { return "brussels_stib_mivb" } -func (z StibMivbBrussels) RealtimeFeeds() []string { - return []string{} +func (z StibMivbBrussels) RealtimeFeeds() map[RealtimeFeedType]string { + return map[RealtimeFeedType]string{} } func (z StibMivbBrussels) Transformer() transform.Transformer { diff --git a/traffic/convert.go b/traffic/convert.go index 9f74a56e08a7c1c970e6d82a01fba16f3a512c62..5cff248b5bec7260e0d6b21c4223812793c25dbe 100644 --- a/traffic/convert.go +++ b/traffic/convert.go @@ -1325,7 +1325,6 @@ feedInfo.Language = record[fields["feed_lang"]] if defaultLanguageIndex, ok := fields["default_lang"]; ok { c.defaultLanguage = record[defaultLanguageIndex] } - feedInfo.Timezone = c.Timezone.String() if ix, ok := fields["feed_start_date"]; ok { c.ValidFrom, err = time.ParseInLocation("20060102", record[ix], c.Timezone) @@ -1343,6 +1342,8 @@ feedInfo.ValidTill = record[ix] } } + feedInfo.Timezone = c.Timezone.String() + feedInfo.RealtimeFeeds = c.Feed.RealtimeFeeds() feedInfo.QrHost, feedInfo.QrLocation, feedInfo.QrSelector = c.Feed.QRInfo() feedInfo.Attributions, feedInfo.Descriptions, err = getAttrDesc(c.Feed.String(), c.feedTranslations) diff --git a/traffic/feeds.go b/traffic/feeds.go index fb6ea9f02043d34a0af2b0b454c1bbb571a33426..11fc0a9d5b1d6b6e403a7242c68d32ed15496404 100644 --- a/traffic/feeds.go +++ b/traffic/feeds.go @@ -17,7 +17,7 @@ type Feed interface { fmt.Stringer ConvertVehicles(string) error // TODO return []Vehicle -> save to file in convert() GetVersions(time.Time, *time.Location) ([]Version, error) - RealtimeFeeds() []string // TODO map[RealtimeFeedType]string + RealtimeFeeds() map[RealtimeFeedType]string Transformer() transform.Transformer Name() string Flags() FeedFlags diff --git a/traffic/gzm_ztm.go b/traffic/gzm_ztm.go index 58491c76ca23126c42ab74464364a87ace073d10..e0de76636826a474e05123b398895e5c46a6e0ed 100644 --- a/traffic/gzm_ztm.go +++ b/traffic/gzm_ztm.go @@ -32,86 +32,6 @@ return fmt.Errorf("ConvertVehicles: cannot create bare file: %w", err) } defer result.Close() return nil - // https://otwartedane.metropoliagzm.pl/dataset/wyposazenie-pojazdu/resource/26c3d1b4-7aa0-4006-b436-4a3f4ab63ac1 - // https://otwartedane.metropoliagzm.pl/dataset/pojazdy/resource/d4a3e7a6-d681-486d-b4e4-53d4c32559f9 - // url := "https://ztm.poznan.pl/en/dla-deweloperow/getGtfsRtFile/?file=vehicle_dictionary.csv" - // response, err := z.client.Get(url) - // if err != nil { - // return fmt.Errorf("ConvertVehicles: cannot GET ‘%s’: %w", url, err) - // } - - // result, err := os.Create(filepath.Join(path, "vehicles.bare")) - // if err != nil { - // return fmt.Errorf("ConvertVehicles: cannot create bare file: %w", err) - // } - // defer result.Close() - - // r := csv.NewReader(response.Body) - // r.Comma = ',' - // header, err := r.Read() - // if err != nil { - // fmt.Println("Header read error") - // return err - // } - // fields := map[string]int{} - // for i, headerField := range header { - // fields[headerField] = i - // } - - // for { - // record, err := r.Read() - // if err == io.EOF { - // break - // } - // if err != nil { - // return err - // } - - // var capabilites uint16 = 0 - - // if record[fields["ramp"]] == "1" { - // capabilites |= 0b0001 - // } - // if record[fields["hf_lf_le"]] == "1" { - // capabilites |= 0b0010 - // } - // if record[fields["hf_lf_le"]] == "2" { - // capabilites |= 0b0001_0000_0000 - // } - // if record[fields["air_conditioner"]] == "1" { - // capabilites |= 0b0100 - // } - // if record[fields["place_for_transp_bicycles"]] == "1" { - // capabilites |= 0b1000 - // } - // if record[fields["voice_announcement_sys"]] == "1" { - // capabilites |= 0b0001_0000 - // } - // if record[fields["ticket_machine"]] == "1" { - // capabilites |= 0b0010_0000 - // } - // if record[fields["ticket_sales_by_the_driver"]] == "1" { - // capabilites |= 0b0100_0000 - // } - // if record[fields["usb_charger"]] == "1" { - // capabilites |= 0b1000_0000 - // } - - // vehicle := Vehicle{ - // Id: ID(record[0]), - // Capabilities: capabilites, - // } - - // bytes, err := bare.Marshal(&vehicle) - // if err != nil { - // return err - // } - // _, err = result.Write(bytes) - // if err != nil { - // return err - // } - // } - // return nil } func (z GzmZtm) GetVersions(date time.Time, timezone *time.Location) ([]Version, error) { @@ -144,9 +64,12 @@ func (z GzmZtm) String() string { return "gzm_ztm" } -func (z GzmZtm) RealtimeFeeds() []string { - // return []string{"http://gtfsrt.metropoliagzm.pl:1111/gtfsrt/gzm/all"} - return []string{} +func (z GzmZtm) RealtimeFeeds() map[RealtimeFeedType]string { + return map[RealtimeFeedType]string{ + TRIP_UPDATES: "http://gtfsrt.metropoliagzm.pl:1111/gtfsrt/gzm/all", + VEHICLE_POSITIONS: "http://gtfsrt.metropoliagzm.pl:1111/gtfsrt/gzm/all", + } + // return map[RealtimeFeedType]string{} } func (z GzmZtm) Transformer() transform.Transformer { diff --git a/traffic/krakow_ztp.go b/traffic/krakow_ztp.go index d522b6f662edd806f43583a2d5c7dbc89d51a382..21c3e36ea0bec7b4acb3ebc01c4d6b550069a29b 100644 --- a/traffic/krakow_ztp.go +++ b/traffic/krakow_ztp.go @@ -41,12 +41,12 @@ // https://gtfs.ztp.krakow.pl/GTFS_KRK_T.zip return []Version{v}, nil } -func (ZtpKrakow) RealtimeFeeds() []string { - return []string{ - "https://gtfs.ztp.krakow.pl/ServiceAlerts_A.pb", - "https://gtfs.ztp.krakow.pl/TripUpdates_A.pb", +func (ZtpKrakow) RealtimeFeeds() map[RealtimeFeedType]string { + return map[RealtimeFeedType]string{ + ALERTS: "https://gtfs.ztp.krakow.pl/ServiceAlerts_A.pb", + TRIP_UPDATES: "https://gtfs.ztp.krakow.pl/TripUpdates_A.pb", //"https://gtfs.ztp.krakow.pl/TripUpdates_T.pb", - "https://gtfs.ztp.krakow.pl/VehiclePositions_A.pb", + VEHICLE_POSITIONS: "https://gtfs.ztp.krakow.pl/VehiclePositions_A.pb", //"https://gtfs.ztp.krakow.pl/VehiclePositions_T.pb", } } diff --git a/traffic/poznan_ztm.go b/traffic/poznan_ztm.go index 41779ebd16edf4856818261920db771f121608af..e5458c99c4f50534fe1a53df7b6f70ec415399d9 100644 --- a/traffic/poznan_ztm.go +++ b/traffic/poznan_ztm.go @@ -149,8 +149,11 @@ func (z ZtmPoznan) String() string { return "poznan_ztm" } -func (z ZtmPoznan) RealtimeFeeds() []string { - return []string{"https://ztm.poznan.pl/en/dla-deweloperow/getGtfsRtFile/?file=feeds.pb"} +func (z ZtmPoznan) RealtimeFeeds() map[RealtimeFeedType]string { + return map[RealtimeFeedType]string{ + TRIP_UPDATES: "https://ztm.poznan.pl/en/dla-deweloperow/getGtfsRtFile/?file=feeds.pb", + VEHICLE_POSITIONS: "https://ztm.poznan.pl/en/dla-deweloperow/getGtfsRtFile/?file=feeds.pb", + } } func (z ZtmPoznan) Transformer() transform.Transformer { diff --git a/traffic/realtime.go b/traffic/realtime.go index 2a6f7f14b912703a052897956670754098d45393..5788d124d398bac9d2c2d0f5d293415171e87302 100644 --- a/traffic/realtime.go +++ b/traffic/realtime.go @@ -108,20 +108,22 @@ } return trip.Id, nil } -func enrichDepartures(departures []DepartureRealtime, datetime time.Time, departuresType DeparturesType, ctx Context, tripsFile *os.File, timezone *time.Location) []DepartureRealtime { // TODO tripsFile -> map[tripOffset]tripID +func enrichDepartures(departures []DepartureRealtime, datetime time.Time, departuresType DeparturesType, ctx Context, tripsFile *os.File, timezone *time.Location) ([]DepartureRealtime, error) { // TODO tripsFile -> map[tripOffset]tripID enrichedDepartures := make([]DepartureRealtime, len(departures)) feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version) if err != nil { log.Printf("while getting feedInfo: %v\n", err) - return departures + feedInfo = FeedInfo{} } var enrichMethod func(string, int, Context) (Update, error) - if _, ok := feedInfo.RealtimeFeeds[TRIP_UPDATES]; ok { - enrichMethod = getGtfsRealtimeUpdates - } else { - // TODO enrichMethod = getLuaRealtieUpdates + if feedInfo.Name != "" { + if _, ok := feedInfo.RealtimeFeeds[TRIP_UPDATES]; ok { + enrichMethod = getGtfsRealtimeUpdates + } else if false /* TODO lua script is there */ { + // TODO enrichMethod = getLuaRealtieUpdates + } } midnight := time.Date(datetime.Year(), datetime.Month(), datetime.Day(), 0, 0, 0, 0, timezone) @@ -132,30 +134,32 @@ offsets[i] = departure.Order.TripOffset } trips, err := GetTripsByOffset(offsets, ctx, func(Trip) bool { return true }) if err != nil { - log.Printf("while getting trips: %v\n", err) + return departures, fmt.Errorf("while getting trips: %w", err) } for i, departure := range departures { if departure.Time.After(midnight) { if err != nil { - log.Printf("while getting trip id for %s -> %s (%v): %v\n", departure.LineName, departure.Headsign, departure.Time, err) - continue + return departures, fmt.Errorf("while getting trip id for %s -> %s (%v): %w", departure.LineName, departure.Headsign, departure.Time, err) } - update, err := enrichMethod(trips[departure.Order.TripOffset].Id, departure.Order.Sequence, ctx) - if err != nil { - if isTimeout(err) { // TODO or any other connection problem - break - } else { - log.Printf("while enriching departure %s -> %s (%v): %v\n", departure.LineName, departure.Headsign, departure.Time, err) + var update Update + if enrichMethod != nil { + update, err = enrichMethod(trips[departure.Order.TripOffset].Id, departure.Order.Sequence, ctx) + if err != nil { + if isTimeout(err) { // TODO or any other connection problem + log.Printf("connection error while enriching departure %s -> %s (%v): %v", departure.LineName, departure.Headsign, departure.Time, err) + break + } else { + log.Printf("while enriching departure %s -> %s (%v): %v\n", departure.LineName, departure.Headsign, departure.Time, err) + } } - } else { - update.VehicleStatus.LineName = trips[departure.Order.TripOffset].LineName - update.VehicleStatus.Headsign = trips[departure.Order.TripOffset].Headsign - enrichedDepartures[i] = departure.WithUpdate(update) } + update.VehicleStatus.LineName = trips[departure.Order.TripOffset].LineName + update.VehicleStatus.Headsign = trips[departure.Order.TripOffset].Headsign + enrichedDepartures[i] = departure.WithUpdate(update) } } } - return departures + return enrichedDepartures, nil } func GetAlerts(languages []language.Tag /*, feed*/) []Alert { diff --git a/traffic/realtime_gtfs.go b/traffic/realtime_gtfs.go index 9aee8bd384c480f9c81ce318338fd8d1c77320a2..228b5649e99c90286fe390b7b86806e2e5f98349 100644 --- a/traffic/realtime_gtfs.go +++ b/traffic/realtime_gtfs.go @@ -1,12 +1,11 @@ package traffic import ( - "math" - "apiote.xyz/p/szczanieckiej/gtfs_rt" pb "apiote.xyz/p/szczanieckiej/gtfs_rt/transit_realtime" "fmt" + "math" "time" ) @@ -85,11 +84,11 @@ } } func getGtfsRtData(entities []*pb.FeedEntity) map[RealtimeFeedType]int { - if updates == nil { - updates = map[string]Update{} - } if vehicleStatuses == nil { vehicleStatuses = map[string]VehicleStatus{} + } + if updates == nil { + updates = map[string]Update{} } which := map[RealtimeFeedType]int{} for _, entity := range entities { @@ -121,8 +120,9 @@ if t != nil { which[TRIP_UPDATES] = 1 update := Update{ - StopSequence: t.StopTimeUpdate[0].GetStopSequence(), - StopID: t.StopTimeUpdate[0].GetStopId(), + StopSequence: t.StopTimeUpdate[0].GetStopSequence(), + StopID: t.StopTimeUpdate[0].GetStopId(), + VehicleStatus: vehicleStatuses[*t.Trip.TripId], } if arrival := t.StopTimeUpdate[0].GetArrival(); arrival != nil { update.Delay = *arrival.Delay @@ -148,17 +148,28 @@ } func getGtfsRealtimeMessages(feedType RealtimeFeedType, feedID string, feeds map[RealtimeFeedType]string) error { now := uint64(time.Now().Unix()) - lastUpdated := lastUpdatedGtfsRt[feedID] - if passed := now - lastUpdated[feedType]; passed > 30 { - message, err := gtfs_rt.GetMessages2(feedID, feeds[feedType]) + if lastUpdatedGtfsRt[feedID] == nil { + lastUpdatedGtfsRt[feedID] = map[RealtimeFeedType]uint64{} + } + if passed := now - lastUpdatedGtfsRt[feedID][feedType]; passed > 30 { + message, err := gtfs_rt.GetMessages(feedID, feeds[feedType]) if err != nil { return fmt.Errorf("while getting messages: %w\n", err) } + switch feedType { + case TRIP_UPDATES: + updates = map[string]Update{} + case VEHICLE_POSITIONS: + vehicleStatuses = map[string]VehicleStatus{} + case ALERTS: + // TODO + } + whichUpdated := getGtfsRtData(message.Entity) for key, value := range whichUpdated { if value == 1 { - lastUpdated[key] = now + lastUpdatedGtfsRt[feedID][key] = now } } }