Author: Adam <git@apiote.xyz>
initial lua realtime
go.mod | 3 + go.sum | 6 ++ traffic/access.go | 16 +++++ traffic/berlin_vbb.go | 13 +++++ traffic/realtime.go | 106 +++++++++++++++++++++++++++++++--------- traffic/realtime_gtfs.go | 7 +- traffic/realtime_lua.go | 108 ++++++++++++++++++++++++++++++++++++++++++
diff --git a/go.mod b/go.mod index 63d5b72d7c6aaf45287c07ad46c69dfbc1668923..5176471d0d5f2492c33aa5c1224b28281a1f10d0 100644 --- a/go.mod +++ b/go.mod @@ -6,14 +6,17 @@ require ( apiote.xyz/p/gott/v2 v2.0.3 git.sr.ht/~sircmpwn/go-bare v0.0.0-20210406120253-ab86bc2846d9 github.com/adrg/strutil v0.3.0 + github.com/cjoudrey/gluahttp v0.0.0-20201111170219-25003d9adfa9 github.com/dhconnelly/rtreego v1.1.0 github.com/golang/protobuf v1.5.2 github.com/sahilm/fuzzy v0.1.0 github.com/ulikunitz/xz v0.5.10 + github.com/yuin/gopher-lua v1.1.1 golang.org/x/text v0.3.6 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e google.golang.org/protobuf v1.26.0 gopkg.in/yaml.v3 v3.0.1 + layeh.com/gopher-json v0.0.0-20201124131017-552bb3c4c3bf notabug.org/apiote/gott v1.1.2 ) diff --git a/go.sum b/go.sum index 12aad1e8e2991b464bc6052d8fb9cf19d4c70087..9b3adca23b170eda0f3d1a9b6c31a277809d86d0 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ git.sr.ht/~sircmpwn/go-bare v0.0.0-20210406120253-ab86bc2846d9 h1:Ahny8Ud1LjVMMAlt8utUFKhhxJtwBAualvsbc/Sk7cE= git.sr.ht/~sircmpwn/go-bare v0.0.0-20210406120253-ab86bc2846d9/go.mod h1:BVJwbDfVjCjoFiKrhkei6NdGcZYpkDkdyCdg1ukytRA= github.com/adrg/strutil v0.3.0 h1:bi/HB2zQbDihC8lxvATDTDzkT4bG7PATtVnDYp5rvq4= github.com/adrg/strutil v0.3.0/go.mod h1:Jz0wzBVE6Uiy9wxo62YEqEY1Nwto3QlLl1Il5gkLKWU= +github.com/cjoudrey/gluahttp v0.0.0-20201111170219-25003d9adfa9 h1:rdWOzitWlNYeUsXmz+IQfa9NkGEq3gA/qQ3mOEqBU6o= +github.com/cjoudrey/gluahttp v0.0.0-20201111170219-25003d9adfa9/go.mod h1:X97UjDTXp+7bayQSFZk2hPvCTmTZIicUjZQRtkwgAKY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -30,6 +32,8 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8= github.com/ulikunitz/xz v0.5.10/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e h1:FDhOuMEY4JVRztM/gsbk+IKUQ8kj74bxZrgw87eMMVc= @@ -44,5 +48,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +layeh.com/gopher-json v0.0.0-20201124131017-552bb3c4c3bf h1:rRz0YsF7VXj9fXRF6yQgFI7DzST+hsI3TeFSGupntu0= +layeh.com/gopher-json v0.0.0-20201124131017-552bb3c4c3bf/go.mod h1:ivKkcY8Zxw5ba0jldhZCYYQfGdb2K6u9tbYK1AwMIBc= notabug.org/apiote/gott v1.1.2 h1:Z22X9/8XrK5M5oARoE2fh3sJGPAJ84GuyGg2nKOjweQ= notabug.org/apiote/gott v1.1.2/go.mod h1:Z9hFvCdzZkFSegBkLa6n0X6AuUiw2BwgG4MFLgBMjD4= diff --git a/traffic/access.go b/traffic/access.go index 6720c4de2981e1232f8f2d85dcecdff024dc11a7..ef323d27334dddd69f504aa179ae8adbfcffe8d2 100644 --- a/traffic/access.go +++ b/traffic/access.go @@ -274,7 +274,7 @@ } func makeDeparturesRealtime(input ...interface{}) (interface{}, error) { result := input[0].(_Result) - departures, err := enrichDepartures(result.Departures, result.Datetime, result.DeparturesType, result.Ctx, result.TripsFile, result.Location) + departures, err := enrichDepartures(result.Stop.Id, result.Departures, result.Datetime, result.DeparturesType, result.Ctx, result.TripsFile, result.Location) result.TripsFile.Close() result.Departures = departures return result, err @@ -847,6 +847,18 @@ return r.(_Result).FeedInfo, nil } } +func GetTrips(ids []string, ctx Context, t *Traffic) (map[string]Trip, error) { // TODO optimise + trips := map[string]Trip{} + for _, id := range ids { + trip, err := GetTrip(id, ctx, t) + if err != nil { + return trips, err + } + trips[trip.Id] = trip + } + return trips, nil +} + func GetTripsByOffset(offsets []uint, context Context, filter func(Trip) bool) (map[uint]Trip, error) { trips := map[uint]Trip{} file, err := os.Open(filepath.Join(context.DataHome, context.FeedID, string(context.Version), "trips.bare")) @@ -1198,7 +1210,7 @@ func GetVehiclesIn(lb, rt Position, context Context, t *Traffic) ([]VehicleStatus, error) { // TODO limit rect size limit := 100.0 - vehiclesRt := getVehiclePositions(context) + vehiclesRt := getVehiclePositions(context, t) vehicles := []VehicleStatus{} rect, err := rtreego.NewRectFromPoints(rtreego.Point{lb.Lat, lb.Lon}, rtreego.Point{rt.Lat, rt.Lon}) diff --git a/traffic/berlin_vbb.go b/traffic/berlin_vbb.go index ad7096c96ec16c145faec13b7976b00615c1cccd..eff340c7c8ba2cda450ff41b352b27d5cecbd3be 100644 --- a/traffic/berlin_vbb.go +++ b/traffic/berlin_vbb.go @@ -53,6 +53,19 @@ func (z VbbBerlin) RealtimeFeeds() map[RealtimeFeedType]string { return map[RealtimeFeedType]string{} } +func (z VbbBerlin) LuaUpdatesScript() string { + return ` + function getUpdates(tripID, sequence) + local http = require("http") + local json = require("json") + + response + + return stopSequence, stopID, delay, time, timetableRelationship + end + ` +} + func (z VbbBerlin) Transformer() transform.Transformer { return transform.Chain(transformers.TransformerDE, transformers.TransformerPL, transformers.TransformerFR) } diff --git a/traffic/realtime.go b/traffic/realtime.go index 5788d124d398bac9d2c2d0f5d293415171e87302..616cf6094665f2a227643163274fdec8a9f8a202 100644 --- a/traffic/realtime.go +++ b/traffic/realtime.go @@ -23,6 +23,7 @@ NO_TRIP_DATA ) type Update struct { + Time string // "HHmmss" StopSequence uint32 StopID string Delay int32 // seconds @@ -40,6 +41,7 @@ Speed float32 // m/s Bearing float64 // radians clockwise from north // TODO maybe (-π, π) LineName string Headsign string + TripID string } type CongestionLevel uint @@ -108,7 +110,35 @@ } return trip.Id, nil } -func enrichDepartures(departures []DepartureRealtime, datetime time.Time, departuresType DeparturesType, ctx Context, tripsFile *os.File, timezone *time.Location) ([]DepartureRealtime, error) { // TODO tripsFile -> map[tripOffset]tripID +func departuresFromNoTripUpdates(updates []Update, timezone *time.Location) ([]DepartureRealtime, error) { + departures := make([]DepartureRealtime, len(updates)) + now := time.Now().In(timezone) + for i, update := range updates { + departureTime, err := time.ParseInLocation("150405", update.Time, timezone) + if err != nil { + return departures, fmt.Errorf("while parsing time: %w", err) + } + departureTime = time.Date(now.Year(), now.Month(), now.Day(), departureTime.Hour(), departureTime.Minute(), departureTime.Second(), 0, timezone) + departures[i] = DepartureRealtime{ + Time: departureTime, + Departure: Departure{ + Pickup: UNKNOWN_BOARDING, // TODO add in TRAFFIC + Dropoff: UNKNOWN_BOARDING, + }, + Headsign: update.VehicleStatus.Headsign, + LineName: update.VehicleStatus.LineName, + Order: StopOrder{ + 0, + int(time.Now().Unix()), + }, + Update: update, // NOTE delay must be 0 + } + } + + return departures, nil +} + +func enrichDepartures(stopID string, departures []DepartureRealtime, datetime time.Time, departuresType DeparturesType, ctx Context, tripsFile *os.File, timezone *time.Location) ([]DepartureRealtime, error) { // TODO tripsFile -> map[tripOffset]tripID enrichedDepartures := make([]DepartureRealtime, len(departures)) feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version) @@ -117,12 +147,12 @@ log.Printf("while getting feedInfo: %v\n", err) feedInfo = FeedInfo{} } - var enrichMethod func(string, int, Context) (Update, error) + var enrichMethod func(string, int, string, Context) (Update, []Update, error) if feedInfo.Name != "" { if _, ok := feedInfo.RealtimeFeeds[TRIP_UPDATES]; ok { enrichMethod = getGtfsRealtimeUpdates - } else if false /* TODO lua script is there */ { - // TODO enrichMethod = getLuaRealtieUpdates + } else if isLuaUpdatesScript(ctx) { + enrichMethod = getLuaRealtimeUpdates } } midnight := time.Date(datetime.Year(), datetime.Month(), @@ -138,24 +168,33 @@ return departures, fmt.Errorf("while getting trips: %w", err) } for i, departure := range departures { if departure.Time.After(midnight) { - if err != nil { - return departures, fmt.Errorf("while getting trip id for %s -> %s (%v): %w", departure.LineName, departure.Headsign, departure.Time, err) - } - var update Update + var ( + update Update + noTripUpdates []Update + ) if enrichMethod != nil { - update, err = enrichMethod(trips[departure.Order.TripOffset].Id, departure.Order.Sequence, ctx) + update, noTripUpdates, err = enrichMethod(trips[departure.Order.TripOffset].Id, departure.Order.Sequence, stopID, ctx) if err != nil { if isTimeout(err) { // TODO or any other connection problem log.Printf("connection error while enriching departure %s -> %s (%v): %v", departure.LineName, departure.Headsign, departure.Time, err) - break + enrichMethod = nil } else { log.Printf("while enriching departure %s -> %s (%v): %v\n", departure.LineName, departure.Headsign, departure.Time, err) } } } - update.VehicleStatus.LineName = trips[departure.Order.TripOffset].LineName - update.VehicleStatus.Headsign = trips[departure.Order.TripOffset].Headsign - enrichedDepartures[i] = departure.WithUpdate(update) + if len(noTripUpdates) == 0 { + update.VehicleStatus.LineName = trips[departure.Order.TripOffset].LineName + update.VehicleStatus.Headsign = trips[departure.Order.TripOffset].Headsign + enrichedDepartures[i] = departure.WithUpdate(update) + } else { + var err error + enrichedDepartures, err = departuresFromNoTripUpdates(noTripUpdates, timezone) + if err != nil { + return departures, fmt.Errorf("while creating departures without trip: %w", err) + } + break + } } } } @@ -166,26 +205,43 @@ func GetAlerts(languages []language.Tag /*, feed*/) []Alert { return []Alert{} } -func getVehiclePositions(ctx Context) []VehicleStatus { +func getVehiclePositions(ctx Context, t *Traffic) []VehicleStatus { feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version) if err != nil { log.Printf("while getting feedInfo: %v\n", err) - return []VehicleStatus{} + feedInfo = FeedInfo{} } var function func(Context) ([]VehicleStatus, error) - if _, ok := feedInfo.RealtimeFeeds[VEHICLE_POSITIONS]; ok { - function = getGtfsRealtimeVehicles - } else { - // TODO enrichMethod = getLuaVehicles + if feedInfo.Name != "" { + if _, ok := feedInfo.RealtimeFeeds[VEHICLE_POSITIONS]; ok { + function = getGtfsRealtimeVehicles + } else if isLuaVehiclesScript(ctx) { + function = getLuaRealtimeVehicles + } } - statuses, err := function(ctx) - if err != nil { - if isTimeout(err) { + if function != nil { + statuses, err := function(ctx) + if err != nil { + log.Printf("while getting vehicle positions: %v\n", err) return []VehicleStatus{} - } else { - log.Printf("while getting vehicle positions: %v\n", err) + } + ids := make([]string, len(statuses)) + for i, status := range statuses { + ids[i] = status.TripID } + trips, err := GetTrips(ids, ctx, t) + if err != nil { + log.Printf("while getting trips: %v", err) + return []VehicleStatus{} + } + statusesWithLine := make([]VehicleStatus, len(statuses)) + for i, status := range statuses { + status.LineName = trips[status.TripID].LineName + status.Headsign = trips[status.TripID].Headsign + statusesWithLine[i] = status + } + return statusesWithLine } - return statuses + return []VehicleStatus{} } diff --git a/traffic/realtime_gtfs.go b/traffic/realtime_gtfs.go index 228b5649e99c90286fe390b7b86806e2e5f98349..1216bd1d3dacef0c628e8f6149da1bbe18621148 100644 --- a/traffic/realtime_gtfs.go +++ b/traffic/realtime_gtfs.go @@ -109,6 +109,7 @@ Latitude: float64(v.GetPosition().GetLatitude()), Longitude: float64(v.GetPosition().GetLongitude()), Speed: v.GetPosition().GetSpeed(), Bearing: float64(v.GetPosition().GetBearing() * math.Pi / 180), + TripID: *v.Trip.TripId, } vehicleStatuses[*v.Trip.TripId] = vehicleUpdate @@ -176,10 +177,10 @@ } return nil } -func getGtfsRealtimeUpdates(tripID string, sequence int, ctx Context) (Update, error) { +func getGtfsRealtimeUpdates(tripID string, sequence int, stopID string, ctx Context) (Update, []Update, error) { feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version) if err != nil { - return Update{}, fmt.Errorf("while getting feedInfo: %w\n", err) + return Update{}, []Update{}, fmt.Errorf("while getting feedInfo: %w\n", err) } getGtfsRealtimeMessages(TRIP_UPDATES, ctx.FeedID, feedInfo.RealtimeFeeds) @@ -189,7 +190,7 @@ getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds) } update := updates[tripID] - return update, nil + return update, []Update{}, nil } func getGtfsRealtimeVehicles(ctx Context) ([]VehicleStatus, error) { diff --git a/traffic/realtime_lua.go b/traffic/realtime_lua.go new file mode 100644 index 0000000000000000000000000000000000000000..cd09c17d9d814855f1e4d5a23e84791be0ca49de --- /dev/null +++ b/traffic/realtime_lua.go @@ -0,0 +1,108 @@ +package traffic + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "path/filepath" + + "github.com/cjoudrey/gluahttp" + "github.com/yuin/gopher-lua" + luajson "layeh.com/gopher-json" +) + +func isLuaUpdatesScript(context Context) bool { + _, err := os.Stat(getLuaUpdatesPath(context)) + return err == nil +} + +func isLuaVehiclesScript(context Context) bool { + _, err := os.Stat(getLuaVehiclesPath(context)) + return err == nil +} + +func getLuaUpdatesPath(context Context) string { + return filepath.Join(context.DataHome, context.FeedID, string(context.Version), "updates.lua") +} + +func getLuaVehiclesPath(context Context) string { + return filepath.Join(context.DataHome, context.FeedID, string(context.Version), "vehicles.lua") +} + +func getLuaRealtimeUpdates(tripID string, sequence int, stopID string, ctx Context) (Update, []Update, error) { + updates := []Update{} + filePath := getLuaUpdatesPath(ctx) + + l := lua.NewState() + defer l.Close() + l.PreloadModule("json", luajson.Loader) + l.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader) + if err := l.DoFile(filePath); err != nil { + return Update{}, []Update{}, fmt.Errorf("while executing lua script: %w", err) + } + + // TODO cache + if err := l.CallByParam(lua.P{ + Fn: l.GetGlobal("getUpdates"), + NRet: 1, + Protect: true, + }, lua.LString(stopID)); err != nil { + return Update{}, []Update{}, fmt.Errorf("while executing updates function: %w", err) + } + result := l.Get(-1) + l.Pop(1) + json.Unmarshal([]byte(result.(lua.LString)), &updates) + if len(updates) == 0 { + return Update{}, updates, nil + } else if len(updates) == 1 { + vehicleStatus, err := getLuaRealtimeVehiclesMap(ctx) + if err != nil { + return updates[0], []Update{}, fmt.Errorf("while getting vehicle statuses : %w", err) + } + updates[0].VehicleStatus = vehicleStatus[tripID] + return updates[0], []Update{}, nil + } else { + return Update{}, updates, nil + } +} + +func getLuaRealtimeVehiclesMap(ctx Context) (map[string]VehicleStatus, error) { + statuses := map[string]VehicleStatus{} + filePath := getLuaVehiclesPath(ctx) + + l := lua.NewState() + defer l.Close() + l.PreloadModule("json", luajson.Loader) + l.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader) + if err := l.DoFile(filePath); err != nil { + return statuses, fmt.Errorf("while executing lua script: %w", err) + } + + // TODO cache + if err := l.CallByParam(lua.P{ + Fn: l.GetGlobal("getVehicles"), + NRet: 1, + Protect: true, + }); err != nil { + return statuses, fmt.Errorf("while executing vehicles function: %w", err) + } + result := l.Get(-1) + l.Pop(1) + json.Unmarshal([]byte(result.(lua.LString)), &statuses) + return statuses, nil +} + +func getLuaRealtimeVehicles(ctx Context) ([]VehicleStatus, error) { + statusesMap, err := getLuaRealtimeVehiclesMap(ctx) + if err != nil { + return []VehicleStatus{}, err + } + + statuses := make([]VehicleStatus, len(statusesMap)) + i := 0 + for _, status := range statusesMap { + statuses[i] = status + } + return statuses, nil +}