Author: Adam Evyčędo <git@apiote.xyz>
new realtime
api/api.go | 68 +++++++++++++++++------------------------ api/lineResponse.go | 4 +- gtfs_rt/main.go | 29 +--------------- traffic/realtime.go | 27 ++++++++++------ traffic/realtime_gtfs.go | 10 +++--- traffic/realtime_lua.go | 69 ++++++++++++++++++++++++++++++-----------
diff --git a/api/api.go b/api/api.go index 0bf0bc8f46a8dfa1e171d1b1dd4705b3e0b999c1..55a8640c69154d8ae4e52831b53109050183c7eb 100644 --- a/api/api.go +++ b/api/api.go @@ -1,6 +1,6 @@ package api -// todo(BAF10) direction (0|1) to const (TO|BACK) +// TODO(BAF10) direction (0|1) to const (TO|BACK) import ( "apiote.xyz/p/szczanieckiej/traffic" @@ -152,16 +152,14 @@ if err != nil { return VehicleV1{}, fmt.Errorf("while getting line %s: %w", vehicle.LineName, err) } return VehicleV1{ - Id: string(vehicle.VehicleID), - Position: PositionV1{vehicle.Latitude, vehicle.Longitude}, - Capabilities: t.Vehicles[context.FeedID][context.Version][vehicle.VehicleID].Capabilities, - Speed: vehicle.Speed, - Line: LineStubV1{Name: line.Name, Kind: makeLineTypeV1(line), Colour: fromColor(line.Colour)}, - Headsign: vehicle.Headsign, - // todo CongestionLevel - // todo OccupancyStatus - // todo Status - // todo Delay + Id: string(vehicle.VehicleID), + Position: PositionV1{vehicle.Latitude, vehicle.Longitude}, + Capabilities: t.Vehicles[context.FeedID][context.Version][vehicle.VehicleID].Capabilities, + Speed: vehicle.Speed, + Line: LineStubV1{Name: line.Name, Kind: makeLineTypeV1(line), Colour: fromColor(line.Colour)}, + Headsign: vehicle.Headsign, + CongestionLevel: convertCongestionLevelV1(vehicle.CongestionLevel), + OccupancyStatus: convertOccupancyStatusV1(vehicle.OccupancyStatus), }, nil } @@ -171,16 +169,14 @@ if err != nil { return VehicleV2{}, fmt.Errorf("while getting line %s: %w", vehicle.LineName, err) } return VehicleV2{ - Id: string(vehicle.VehicleID), - Position: PositionV1{vehicle.Latitude, vehicle.Longitude}, - Capabilities: t.Vehicles[context.FeedID][context.Version][vehicle.VehicleID].Capabilities, - Speed: vehicle.Speed, - Line: LineStubV2{Name: line.Name, Kind: makeLineTypeV2(line), Colour: fromColor(line.Colour)}, - Headsign: vehicle.Headsign, - // todo CongestionLevel - // todo OccupancyStatus - // todo Status - // todo Delay + Id: string(vehicle.VehicleID), + Position: PositionV1{vehicle.Latitude, vehicle.Longitude}, + Capabilities: t.Vehicles[context.FeedID][context.Version][vehicle.VehicleID].Capabilities, + Speed: vehicle.Speed, + Line: LineStubV2{Name: line.Name, Kind: makeLineTypeV2(line), Colour: fromColor(line.Colour)}, + Headsign: vehicle.Headsign, + CongestionLevel: convertCongestionLevelV1(vehicle.CongestionLevel), + OccupancyStatus: convertOccupancyStatusV1(vehicle.OccupancyStatus), }, nil } @@ -190,16 +186,14 @@ if err != nil { return VehicleV3{}, fmt.Errorf("while getting line %s: %w", vehicle.LineName, err) } return VehicleV3{ - Id: string(vehicle.VehicleID), - Position: PositionV1{vehicle.Latitude, vehicle.Longitude}, - Capabilities: t.Vehicles[context.FeedID][context.Version][vehicle.VehicleID].Capabilities, - Speed: vehicle.Speed, - Line: LineStubV3{Name: line.Name, Kind: makeLineTypeV3(line), Colour: fromColor(line.Colour)}, - Headsign: vehicle.Headsign, - // todo CongestionLevel - // todo OccupancyStatus - // todo Status - // todo Delay + Id: string(vehicle.VehicleID), + Position: PositionV1{vehicle.Latitude, vehicle.Longitude}, + Capabilities: t.Vehicles[context.FeedID][context.Version][vehicle.VehicleID].Capabilities, + Speed: vehicle.Speed, + Line: LineStubV3{Name: line.Name, Kind: makeLineTypeV3(line), Colour: fromColor(line.Colour)}, + Headsign: vehicle.Headsign, + CongestionLevel: convertCongestionLevelV1(vehicle.CongestionLevel), + OccupancyStatus: convertOccupancyStatusV1(vehicle.OccupancyStatus), }, nil } @@ -266,7 +260,7 @@ if stop, ok := item.(traffic.Stop); ok { s := convertTrafficStop(stop) success.Queryables = append(success.Queryables, QueryableV1(s)) } else { - // todo error + continue } } if otherV1, ok := other.(QueryablesResponseV1); ok { @@ -289,7 +283,7 @@ } else if line, ok := item.(traffic.Line); ok { l := convertTrafficLine(line, context.FeedID) success.Queryables = append(success.Queryables, QueryableV2(l)) } else { - // todo error + continue } } if otherV2, ok := other.(QueryablesResponseV2); ok { @@ -317,7 +311,7 @@ } else if line, ok := item.(traffic.Line); ok { l := convertTrafficLineV2(line, context.FeedID) success.Queryables = append(success.Queryables, QueryableV3(l)) } else { - // todo error + continue } } if otherV3, ok := other.(QueryablesResponseDev); ok { @@ -672,8 +666,6 @@ } timeToArrival := departureTime.Sub(datetime).Minutes() if departure.IsRealtime { departure.Status = convertVehicleStatusV1(trafficDeparture.Update.VehicleStatus.Status, timeToArrival) - departure.Vehicle.CongestionLevel = convertCongestionLevelV1(trafficDeparture.Update.VehicleStatus.CongestionLevel) - departure.Vehicle.OccupancyStatus = convertOccupancyStatusV1(trafficDeparture.Update.VehicleStatus.OccupancyStatus) } d = append(d, departure) } @@ -723,8 +715,6 @@ } timeToArrival := departureTime.Sub(datetime).Minutes() if departure.IsRealtime { departure.Status = convertVehicleStatusV1(trafficDeparture.Update.VehicleStatus.Status, timeToArrival) - departure.Vehicle.CongestionLevel = convertCongestionLevelV1(trafficDeparture.Update.VehicleStatus.CongestionLevel) - departure.Vehicle.OccupancyStatus = convertOccupancyStatusV1(trafficDeparture.Update.VehicleStatus.OccupancyStatus) } d = append(d, departure) } @@ -774,8 +764,6 @@ } timeToArrival := departureTime.Sub(datetime).Minutes() if departure.IsRealtime { departure.Status = convertVehicleStatusV1(trafficDeparture.Update.VehicleStatus.Status, timeToArrival) - departure.Vehicle.CongestionLevel = convertCongestionLevelV1(trafficDeparture.Update.VehicleStatus.CongestionLevel) - departure.Vehicle.OccupancyStatus = convertOccupancyStatusV1(trafficDeparture.Update.VehicleStatus.OccupancyStatus) } d = append(d, departure) } diff --git a/api/lineResponse.go b/api/lineResponse.go index 76db96c1e13c112f22038f75dbd8f17f3b1e0888..510096fc1fe49cfa8c8a9aa5c5497df5465c753c 100644 --- a/api/lineResponse.go +++ b/api/lineResponse.go @@ -32,7 +32,7 @@ return LineV2{}, err } return response.(LineResponseDev).Line, nil - // TODO + // TODO(cleaning) move from api/api.go } func makeLineV1(line traffic.Line, context traffic.Context, t *traffic.Traffic) (LineV1, error) { @@ -42,5 +42,5 @@ return LineV1{}, err } return response.(LineResponseV1).Line, nil - // TODO + // TODO(cleaning) move from api/api.go } diff --git a/gtfs_rt/main.go b/gtfs_rt/main.go index 580561e7c941fbcbbf091b23b2c6244263291d89..6db4fa09d1270ea7692573c94c41dd870c8288ce 100644 --- a/gtfs_rt/main.go +++ b/gtfs_rt/main.go @@ -1,30 +1,6 @@ -package gtfs_rt // TODO move to traffic - -import ( - pb "apiote.xyz/p/szczanieckiej/gtfs_rt/transit_realtime" - - "fmt" - "time" - - "golang.org/x/text/language" -) - -type Update struct { // TODO use traffic/Update - VehicleID string - Latitude float32 - Longitude float32 - Speed float32 - StopSeq uint32 - StopID string - Delay int32 - TripUpdate *pb.TripUpdate - Status pb.VehiclePosition_VehicleStopStatus - OccupancyStatus *pb.VehiclePosition_OccupancyStatus - CongestionLevel *pb.VehiclePosition_CongestionLevel - Capabilities uint8 - Time time.Time -} +package gtfs_rt +/* TODO move alerts type Alerts struct { ByRoute map[string][]uint ByTrip map[string][]uint @@ -134,3 +110,4 @@ } } return nil } +*/ diff --git a/traffic/realtime.go b/traffic/realtime.go index 616cf6094665f2a227643163274fdec8a9f8a202..d5d6ae5de9496f7d517707d4d7cd6576bfc69ce7 100644 --- a/traffic/realtime.go +++ b/traffic/realtime.go @@ -110,10 +110,11 @@ } return trip.Id, nil } -func departuresFromNoTripUpdates(updates []Update, timezone *time.Location) ([]DepartureRealtime, error) { +func departuresFromNoTripUpdates(updates map[string]Update, pickups, dropoffs map[string]Boarding, timezone *time.Location) ([]DepartureRealtime, error) { departures := make([]DepartureRealtime, len(updates)) now := time.Now().In(timezone) - for i, update := range updates { + i := 0 + for _, update := range updates { departureTime, err := time.ParseInLocation("150405", update.Time, timezone) if err != nil { return departures, fmt.Errorf("while parsing time: %w", err) @@ -122,8 +123,8 @@ departureTime = time.Date(now.Year(), now.Month(), now.Day(), departureTime.Hour(), departureTime.Minute(), departureTime.Second(), 0, timezone) departures[i] = DepartureRealtime{ Time: departureTime, Departure: Departure{ - Pickup: UNKNOWN_BOARDING, // TODO add in TRAFFIC - Dropoff: UNKNOWN_BOARDING, + Pickup: pickups[update.VehicleStatus.LineName], + Dropoff: dropoffs[update.VehicleStatus.LineName], }, Headsign: update.VehicleStatus.Headsign, LineName: update.VehicleStatus.LineName, @@ -133,6 +134,7 @@ int(time.Now().Unix()), }, Update: update, // NOTE delay must be 0 } + i++ } return departures, nil @@ -147,7 +149,7 @@ log.Printf("while getting feedInfo: %v\n", err) feedInfo = FeedInfo{} } - var enrichMethod func(string, int, string, Context) (Update, []Update, error) + var enrichMethod func(string, int, string, Context) (map[string]Update, bool, error) if feedInfo.Name != "" { if _, ok := feedInfo.RealtimeFeeds[TRIP_UPDATES]; ok { enrichMethod = getGtfsRealtimeUpdates @@ -159,8 +161,12 @@ midnight := time.Date(datetime.Year(), datetime.Month(), datetime.Day(), 0, 0, 0, 0, timezone) if departuresType == DEPARTURES_HYBRID { offsets := make([]uint, len(departures)) + pickups := map[string]Boarding{} + dropoffs := map[string]Boarding{} for i, departure := range departures { offsets[i] = departure.Order.TripOffset + pickups[departure.LineName] = departure.Departure.Pickup + dropoffs[departure.LineName] = departure.Departure.Dropoff } trips, err := GetTripsByOffset(offsets, ctx, func(Trip) bool { return true }) if err != nil { @@ -169,11 +175,11 @@ } for i, departure := range departures { if departure.Time.After(midnight) { var ( - update Update - noTripUpdates []Update + updates map[string]Update + areTripsInTimetable bool ) if enrichMethod != nil { - update, noTripUpdates, err = enrichMethod(trips[departure.Order.TripOffset].Id, departure.Order.Sequence, stopID, ctx) + updates, areTripsInTimetable, err = enrichMethod(trips[departure.Order.TripOffset].Id, departure.Order.Sequence, stopID, ctx) if err != nil { if isTimeout(err) { // TODO or any other connection problem log.Printf("connection error while enriching departure %s -> %s (%v): %v", departure.LineName, departure.Headsign, departure.Time, err) @@ -183,13 +189,14 @@ log.Printf("while enriching departure %s -> %s (%v): %v\n", departure.LineName, departure.Headsign, departure.Time, err) } } } - if len(noTripUpdates) == 0 { + if areTripsInTimetable { + update := updates[trips[departure.Order.TripOffset].Id] update.VehicleStatus.LineName = trips[departure.Order.TripOffset].LineName update.VehicleStatus.Headsign = trips[departure.Order.TripOffset].Headsign enrichedDepartures[i] = departure.WithUpdate(update) } else { var err error - enrichedDepartures, err = departuresFromNoTripUpdates(noTripUpdates, timezone) + enrichedDepartures, err = departuresFromNoTripUpdates(updates, pickups, dropoffs, timezone) if err != nil { return departures, fmt.Errorf("while creating departures without trip: %w", err) } diff --git a/traffic/realtime_gtfs.go b/traffic/realtime_gtfs.go index 1216bd1d3dacef0c628e8f6149da1bbe18621148..b7acbcda465a2014bcd07a70f616b32f64e20993 100644 --- a/traffic/realtime_gtfs.go +++ b/traffic/realtime_gtfs.go @@ -9,6 +9,7 @@ "math" "time" ) +// ........................ feedID var lastUpdatedGtfsRt = map[string]map[RealtimeFeedType]uint64{} func makeTimetableRelationshipFromTripTimetable(r pb.TripDescriptor_ScheduleRelationship) TimetableRelationship { @@ -164,7 +165,7 @@ updates = map[string]Update{} case VEHICLE_POSITIONS: vehicleStatuses = map[string]VehicleStatus{} case ALERTS: - // TODO + // TODO alerts } whichUpdated := getGtfsRtData(message.Entity) @@ -177,10 +178,10 @@ } return nil } -func getGtfsRealtimeUpdates(tripID string, sequence int, stopID string, ctx Context) (Update, []Update, error) { +func getGtfsRealtimeUpdates(_ string, _ int, _ string, ctx Context) (map[string]Update, bool, error) { feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version) if err != nil { - return Update{}, []Update{}, fmt.Errorf("while getting feedInfo: %w\n", err) + return map[string]Update{}, false, fmt.Errorf("while getting feedInfo: %w\n", err) } getGtfsRealtimeMessages(TRIP_UPDATES, ctx.FeedID, feedInfo.RealtimeFeeds) @@ -189,8 +190,7 @@ // TODO should be moved to enrichDepartures and conditional (this, or custom API) getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds) } - update := updates[tripID] - return update, []Update{}, nil + return updates, true, nil } func getGtfsRealtimeVehicles(ctx Context) ([]VehicleStatus, error) { diff --git a/traffic/realtime_lua.go b/traffic/realtime_lua.go index cd09c17d9d814855f1e4d5a23e84791be0ca49de..bd8279414edc89014b6bf631202e69c38f881ed7 100644 --- a/traffic/realtime_lua.go +++ b/traffic/realtime_lua.go @@ -6,12 +6,22 @@ "fmt" "net/http" "os" "path/filepath" + "time" "github.com/cjoudrey/gluahttp" "github.com/yuin/gopher-lua" luajson "layeh.com/gopher-json" ) +// ..................... feedID stopID +var lastUpdatedLua = map[string]map[RealtimeFeedType]map[string]uint64{} + +type LuaUpdates struct { + AreTripsInTimetable bool + Updates map[string]Update + // TODO Alerts +} + func isLuaUpdatesScript(context Context) bool { _, err := os.Stat(getLuaUpdatesPath(context)) return err == nil @@ -30,46 +40,67 @@ func getLuaVehiclesPath(context Context) string { return filepath.Join(context.DataHome, context.FeedID, string(context.Version), "vehicles.lua") } -func getLuaRealtimeUpdates(tripID string, sequence int, stopID string, ctx Context) (Update, []Update, error) { - updates := []Update{} +func getLuaRealtimeUpdates(_ string, _ int, stopID string, ctx Context) (map[string]Update, bool, error) { + luaUpdates := LuaUpdates{} filePath := getLuaUpdatesPath(ctx) + now := uint64(time.Now().Unix()) + if lastUpdatedLua[ctx.FeedID] == nil { + lastUpdatedLua[ctx.FeedID] = map[RealtimeFeedType]map[string]uint64{} + } + if lastUpdatedLua[ctx.FeedID][TRIP_UPDATES] == nil { + lastUpdatedLua[ctx.FeedID][TRIP_UPDATES] = map[string]uint64{} + } + if passed := now - lastUpdatedLua[ctx.FeedID][TRIP_UPDATES][stopID]; passed > 30 { + // TODO return from cache + } l := lua.NewState() defer l.Close() l.PreloadModule("json", luajson.Loader) l.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader) if err := l.DoFile(filePath); err != nil { - return Update{}, []Update{}, fmt.Errorf("while executing lua script: %w", err) + return map[string]Update{}, false, fmt.Errorf("while executing lua script: %w", err) } - // TODO cache if err := l.CallByParam(lua.P{ Fn: l.GetGlobal("getUpdates"), NRet: 1, Protect: true, }, lua.LString(stopID)); err != nil { - return Update{}, []Update{}, fmt.Errorf("while executing updates function: %w", err) + return map[string]Update{}, false, fmt.Errorf("while executing updates function: %w", err) } result := l.Get(-1) l.Pop(1) - json.Unmarshal([]byte(result.(lua.LString)), &updates) - if len(updates) == 0 { - return Update{}, updates, nil - } else if len(updates) == 1 { - vehicleStatus, err := getLuaRealtimeVehiclesMap(ctx) - if err != nil { - return updates[0], []Update{}, fmt.Errorf("while getting vehicle statuses : %w", err) - } - updates[0].VehicleStatus = vehicleStatus[tripID] - return updates[0], []Update{}, nil - } else { - return Update{}, updates, nil + json.Unmarshal([]byte(result.(lua.LString)), &luaUpdates) + + vehicleStatuses, err := getLuaRealtimeVehiclesMap(ctx) + if err != nil { + return luaUpdates.Updates, false, fmt.Errorf("while getting vehicle statuses : %w", err) + } + for tripID, status := range vehicleStatuses { + u := luaUpdates.Updates[tripID] + u.VehicleStatus = status + luaUpdates.Updates[tripID] = u } + for tripID, update := range luaUpdates.Updates { + updates[tripID] = update + } + return luaUpdates.Updates, luaUpdates.AreTripsInTimetable, nil } func getLuaRealtimeVehiclesMap(ctx Context) (map[string]VehicleStatus, error) { statuses := map[string]VehicleStatus{} filePath := getLuaVehiclesPath(ctx) + now := uint64(time.Now().Unix()) + if lastUpdatedLua[ctx.FeedID] == nil { + lastUpdatedLua[ctx.FeedID] = map[RealtimeFeedType]map[string]uint64{} + } + if lastUpdatedLua[ctx.FeedID][VEHICLE_POSITIONS] == nil { + lastUpdatedLua[ctx.FeedID][VEHICLE_POSITIONS] = map[string]uint64{} + } + if passed := now - lastUpdatedLua[ctx.FeedID][VEHICLE_POSITIONS][""]; passed > 30 { + return vehicleStatuses, nil + } l := lua.NewState() defer l.Close() @@ -79,7 +110,6 @@ if err := l.DoFile(filePath); err != nil { return statuses, fmt.Errorf("while executing lua script: %w", err) } - // TODO cache if err := l.CallByParam(lua.P{ Fn: l.GetGlobal("getVehicles"), NRet: 1, @@ -90,7 +120,8 @@ } result := l.Get(-1) l.Pop(1) json.Unmarshal([]byte(result.(lua.LString)), &statuses) - return statuses, nil + vehicleStatuses = statuses + return vehicleStatuses, nil } func getLuaRealtimeVehicles(ctx Context) ([]VehicleStatus, error) {