Update server HTTPS configuration (#337)

* Merge HTTP and HTTPS routes builder in coordinator

* Extract HTTP/S routes

* Rename config in coordinator

* Generalize child services

* Extract games library helper function

* Use string address instead of port for HTTP/S

* Use a dedicated port extractor function

* Add missing GA tag templating

* Rename shared server address and port params

* Introduce TLS config parameters

* Simplify HTTP/S server constructors

* Update server auto port roll

* Extract init function in worker

* Reorder return params of address type port fn

* Refactor config handler names

* Update TLS default config params

* Extract HTTP to HTTPS redirect function

* Use httpx in monitoring

* Don't log echo requests

* Remove error return from the abstract server

* Add WSS option to the worker-coordinator connection

* Change default worker config

* Make worker send its internal connection params

* Decouple gamelib from the coordinator

* Expose HTTP/S listener address

* Keep original HTTP/S addresses

* Remove no config handler in worker

* Use HTTP-HTTPS redirection

* Wrap net.Listener into a struct

* Clean http server creation fn

* Redirect to https with a generated address

* Use URL in the redirector

* Use zone address param in worker

* Make use of actual addr and port in the monitoring servers

* Use auto-justified monitoring addresses info

* Add the non-HTTPS worker to HTTPS coordinator connection warning

* Embed TLS struct

* Move connection API struct into cws package
This commit is contained in:
sergystepanov 2021-08-09 10:42:06 +03:00 committed by GitHub
parent 2657dfbc70
commit 431d215eee
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 952 additions and 658 deletions

View file

@ -4,13 +4,11 @@ import (
"context"
goflag "flag"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
config "github.com/giongto35/cloud-game/v2/pkg/config/coordinator"
"github.com/giongto35/cloud-game/v2/pkg/coordinator"
"github.com/giongto35/cloud-game/v2/pkg/os"
"github.com/giongto35/cloud-game/v2/pkg/util/logging"
"github.com/golang/glog"
flag "github.com/spf13/pflag"
@ -18,9 +16,11 @@ import (
var Version = ""
func main() {
func init() {
rand.Seed(time.Now().UTC().UnixNano())
}
func main() {
conf := config.NewConfig()
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
conf.ParseFlags()
@ -28,29 +28,13 @@ func main() {
logging.Init()
defer logging.Flush()
ctx, cancelCtx := context.WithCancel(context.Background())
glog.Infof("[coordinator] version: %v", Version)
glog.Infof("Initializing coordinator server")
glog.V(4).Infof("Coordinator configs %v", conf)
app := coordinator.New(ctx, conf)
if err := app.Run(); err != nil {
glog.Errorf("Failed to run coordinator server, reason %v", err)
os.Exit(1)
}
c := coordinator.New(conf)
c.Start()
signals := make(chan os.Signal, 1)
done := make(chan struct{}, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
go func() {
sig := <-signals
glog.V(4).Infof("[coordinator] Shutting down [os:%v]", sig)
done <- struct{}{}
}()
<-done
app.Shutdown()
ctx, cancelCtx := context.WithCancel(context.Background())
defer c.Shutdown(ctx)
<-os.ExpectTermination()
cancelCtx()
}

View file

@ -4,12 +4,10 @@ import (
"context"
goflag "flag"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
config "github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/giongto35/cloud-game/v2/pkg/os"
"github.com/giongto35/cloud-game/v2/pkg/thread"
"github.com/giongto35/cloud-game/v2/pkg/util/logging"
"github.com/giongto35/cloud-game/v2/pkg/worker"
@ -31,27 +29,14 @@ func run() {
logging.Init()
defer logging.Flush()
ctx, cancelCtx := context.WithCancel(context.Background())
glog.Infof("[worker] version: %v", Version)
glog.V(4).Info("[worker] Initialization")
glog.V(4).Infof("[worker] Local configuration %+v", conf)
app := worker.New(ctx, conf)
app.Run()
wrk := worker.New(conf)
wrk.Start()
signals := make(chan os.Signal, 1)
done := make(chan struct{}, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
go func() {
sig := <-signals
glog.V(4).Infof("[worker] Shutting down [os:%v]", sig)
done <- struct{}{}
}()
<-done
app.Shutdown()
ctx, cancelCtx := context.WithCancel(context.Background())
defer wrk.Shutdown(ctx)
<-os.ExpectTermination()
cancelCtx()
}

45
configs/config.yaml vendored
View file

@ -3,6 +3,7 @@
#
# application environment (dev, staging, prod)
# deprecated
environment: dev
coordinator:
@ -31,16 +32,20 @@ coordinator:
profilingEnabled: false
metricEnabled: false
urlPrefix: /coordinator
# the public domain of the coordinator
publicDomain: http://localhost:8000
# specify the worker address that the client can ping (with protocol and port)
pingServer:
# HTTP(S) server config
server:
port: 8000
httpsPort: 443
httpsKey:
httpsChain:
address: :8000
https: false
# Letsencrypt or self cert config
tls:
address: :443
letsencryptUrl:
# allowed host name
domain:
# if both are set then will use certs
# and Letsencryt instead
httpsCert:
httpsKey:
analytics:
inject: false
gtag:
@ -49,6 +54,12 @@ worker:
network:
# a coordinator address to connect to
coordinatorAddress: localhost:8000
# where to connect
endpoint: /wso
# ping endpoint
pingEndpoint: /echo
# make coordinator connection secure (wss)
secure: false
# ISO Alpha-2 country code to group workers by zones
zone:
monitoring:
@ -56,13 +67,21 @@ worker:
port: 6602
profilingEnabled: false
# monitoring server URL prefix
metricEnabled: true
metricEnabled: false
urlPrefix: /worker
server:
port: 9000
httpsPort: 443
httpsKey:
httpsChain:
address: :9000
https: false
tls:
address: :444
# LetsEncrypt config
# allowed host name
domain:
# if empty will use URL from Go
letsencryptUrl:
# Own certs config
httpsCert:
httpsKey:
emulator:
# set output viewport scale factor

View file

@ -12,13 +12,11 @@ import (
type Config struct {
Coordinator struct {
Analytics Analytics
PublicDomain string
PingServer string
DebugHost string
Library games.Config
Monitoring monitoring.ServerMonitoringConfig
Server shared.Server
DebugHost string
Library games.Config
Monitoring monitoring.Config
Server shared.Server
Analytics Analytics
}
Emulator emulator.Emulator
Environment shared.Environment

View file

@ -53,7 +53,7 @@ type LibretroCoreConfig struct {
}
// GetLibretroCoreConfig returns a core config with expanded paths.
func (e *Emulator) GetLibretroCoreConfig(emulator string) LibretroCoreConfig {
func (e Emulator) GetLibretroCoreConfig(emulator string) LibretroCoreConfig {
cores := e.Libretro.Cores
conf := cores.List[emulator]
conf.Lib = path.Join(cores.Paths.Libs, conf.Lib)
@ -65,7 +65,7 @@ func (e *Emulator) GetLibretroCoreConfig(emulator string) LibretroCoreConfig {
// GetEmulatorByRom returns emulator name by its supported ROM name.
// !to cache into an optimized data structure
func (e *Emulator) GetEmulatorByRom(rom string) string {
func (e Emulator) GetEmulatorByRom(rom string) string {
for emu, core := range e.Libretro.Cores.List {
for _, romName := range core.Roms {
if rom == romName {
@ -76,7 +76,7 @@ func (e *Emulator) GetEmulatorByRom(rom string) string {
return ""
}
func (e *Emulator) GetSupportedExtensions() []string {
func (e Emulator) GetSupportedExtensions() []string {
var extensions []string
for _, core := range e.Libretro.Cores.List {
extensions = append(extensions, core.Roms...)

View file

@ -1,8 +1,10 @@
package monitoring
type ServerMonitoringConfig struct {
type Config struct {
Port int
URLPrefix string
MetricEnabled bool `json:"metric_enabled"`
ProfilingEnabled bool `json:"profiling_enabled"`
}
func (c *Config) IsEnabled() bool { return c.MetricEnabled || c.ProfilingEnabled }

View file

@ -8,17 +8,29 @@ import (
type Environment environment.Env
type Server struct {
Port int
HttpsPort int
HttpsKey string
HttpsChain string
Address string
Https bool
Tls struct {
Address string
Domain string
LetsencryptUrl string
HttpsKey string
HttpsCert string
}
}
func (s *Server) WithFlags() {
flag.IntVar(&s.Port, "port", s.Port, "HTTP server port")
flag.IntVar(&s.HttpsPort, "httpsPort", s.HttpsPort, "HTTPS server port (just why?)")
flag.StringVar(&s.HttpsKey, "httpsKey", s.HttpsKey, "HTTPS key")
flag.StringVar(&s.HttpsChain, "httpsChain", s.HttpsChain, "HTTPS chain")
flag.StringVar(&s.Address, "address", s.Address, "HTTP server address (host:port)")
flag.StringVar(&s.Tls.Address, "httpsAddress", s.Tls.Address, "HTTPS server address (host:port)")
flag.StringVar(&s.Tls.HttpsKey, "httpsKey", s.Tls.HttpsKey, "HTTPS key")
flag.StringVar(&s.Tls.HttpsCert, "httpsCert", s.Tls.HttpsCert, "HTTPS chain")
}
func (s *Server) GetAddr() string {
if s.Https {
return s.Tls.Address
}
return s.Address
}
func (env *Environment) Get() environment.Env {

View file

@ -1,8 +1,8 @@
package worker
import (
"encoding/json"
"log"
"net/url"
"strings"
"github.com/giongto35/cloud-game/v2/pkg/config"
@ -19,34 +19,31 @@ type Config struct {
Encoder encoder.Encoder
Emulator emulator.Emulator
Environment shared.Environment
Worker struct {
Monitoring monitoring.ServerMonitoringConfig
Network struct {
CoordinatorAddress string
Zone string
}
Server shared.Server
Worker Worker
Webrtc webrtcConfig.Webrtc
}
type Worker struct {
Monitoring monitoring.Config
Network struct {
CoordinatorAddress string
Endpoint string
PingEndpoint string
Secure bool
Zone string
}
Webrtc webrtcConfig.Webrtc
Loaded bool
Server shared.Server
}
// allows custom config path
var configPath string
func NewConfig() (conf Config) {
if err := config.LoadConfig(&conf, configPath); err == nil {
conf.Loaded = true
}
_ = config.LoadConfig(&conf, configPath)
conf.expandSpecialTags()
return
}
func EmptyConfig() (conf Config) {
conf.Loaded = false
return
}
// ParseFlags updates config values from passed runtime flags.
// Define own flags with default value set to the current config param.
// Don't forget to call flag.Parse().
@ -60,18 +57,6 @@ func (c *Config) ParseFlags() {
flag.Parse()
}
func (c *Config) Serialize() []byte {
res, _ := json.Marshal(c)
return res
}
func (c *Config) Deserialize(data []byte) {
if err := json.Unmarshal(data, c); err == nil {
c.Loaded = true
}
c.expandSpecialTags()
}
// expandSpecialTags replaces all the special tags in the config.
func (c *Config) expandSpecialTags() {
// home dir
@ -87,3 +72,15 @@ func (c *Config) expandSpecialTags() {
}
}
}
// GetAddr returns defined in the config server address.
func (w *Worker) GetAddr() string { return w.Server.GetAddr() }
// GetPingAddr returns exposed to clients server ping endpoint address.
func (w *Worker) GetPingAddr(address string) string {
pingURL := url.URL{Scheme: "http", Host: address, Path: w.Network.PingEndpoint}
if w.Server.Https {
pingURL.Scheme = "https"
}
return pingURL.String()
}

View file

@ -1,166 +1,27 @@
package coordinator
import (
"context"
"crypto/tls"
"fmt"
"log"
"net/http"
"strconv"
"time"
"github.com/giongto35/cloud-game/v2/pkg/config/coordinator"
"github.com/giongto35/cloud-game/v2/pkg/environment"
"github.com/giongto35/cloud-game/v2/pkg/games"
"github.com/giongto35/cloud-game/v2/pkg/monitoring"
"github.com/golang/glog"
"github.com/gorilla/mux"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
"github.com/giongto35/cloud-game/v2/pkg/service"
)
const stagingLEURL = "https://acme-staging-v02.api.letsencrypt.org/directory"
type Coordinator struct {
ctx context.Context
cfg coordinator.Config
monitoringServer *monitoring.ServerMonitoring
}
func New(ctx context.Context, cfg coordinator.Config) *Coordinator {
return &Coordinator{
ctx: ctx,
cfg: cfg,
monitoringServer: monitoring.NewServerMonitoring(cfg.Coordinator.Monitoring, "cord"),
}
}
func (c *Coordinator) Run() error {
go c.initializeCoordinator()
go c.RunMonitoringServer()
return nil
}
func (c *Coordinator) RunMonitoringServer() {
glog.Infoln("Starting monitoring server for coordinator")
err := c.monitoringServer.Run()
func New(conf coordinator.Config) (services service.Group) {
srv := NewServer(conf, games.NewLibWhitelisted(conf.Coordinator.Library, conf.Emulator))
httpSrv, err := NewHTTPServer(conf, func(mux *http.ServeMux) {
mux.HandleFunc("/ws", srv.WS)
mux.HandleFunc("/wso", srv.WSO)
})
if err != nil {
glog.Errorf("Failed to start monitoring server, reason %s", err)
}
}
func (c *Coordinator) Shutdown() {
if err := c.monitoringServer.Shutdown(c.ctx); err != nil {
glog.Errorln("Failed to shutdown monitoring server")
}
}
func makeServerFromMux(mux *http.ServeMux) *http.Server {
// set timeouts so that a slow or malicious client doesn't
// hold resources forever
return &http.Server{
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 120 * time.Second,
Handler: mux,
}
}
func makeHTTPServer(server *Server) *http.Server {
r := mux.NewRouter()
r.HandleFunc("/ws", server.WS)
r.HandleFunc("/wso", server.WSO)
r.PathPrefix("/static/").Handler(http.StripPrefix("/static/", http.FileServer(http.Dir("./web"))))
r.PathPrefix("/").Handler(server.GetWeb(server.cfg))
svmux := &http.ServeMux{}
svmux.Handle("/", r)
return makeServerFromMux(svmux)
}
func makeHTTPToHTTPSRedirectServer(server *Server) *http.Server {
handleRedirect := func(w http.ResponseWriter, r *http.Request) {
newURI := "https://" + r.Host + r.URL.String()
http.Redirect(w, r, newURI, http.StatusFound)
}
r := mux.NewRouter()
r.PathPrefix("/").HandlerFunc(handleRedirect)
svmux := &http.ServeMux{}
svmux.Handle("/", r)
return makeServerFromMux(svmux)
}
// initializeCoordinator setup an coordinator server
func (c *Coordinator) initializeCoordinator() {
// init games library
libraryConf := c.cfg.Coordinator.Library
if len(libraryConf.Supported) == 0 {
libraryConf.Supported = c.cfg.Emulator.GetSupportedExtensions()
}
lib := games.NewLibrary(libraryConf)
lib.Scan()
server := NewServer(c.cfg, lib)
var certManager *autocert.Manager
var httpsSrv *http.Server
log.Println("Initializing Coordinator Server")
mode := c.cfg.Environment.Get()
if mode.AnyOf(environment.Production, environment.Staging) {
serverConfig := c.cfg.Coordinator.Server
httpsSrv = makeHTTPServer(server)
httpsSrv.Addr = fmt.Sprintf(":%d", serverConfig.HttpsPort)
if serverConfig.HttpsChain == "" || serverConfig.HttpsKey == "" {
serverConfig.HttpsChain = ""
serverConfig.HttpsKey = ""
var leurl string
if mode == environment.Staging {
leurl = stagingLEURL
} else {
leurl = acme.LetsEncryptURL
}
certManager = &autocert.Manager{
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist(c.cfg.Coordinator.PublicDomain),
Cache: autocert.DirCache("assets/cache"),
Client: &acme.Client{DirectoryURL: leurl},
}
httpsSrv.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate}
}
go func(chain string, key string) {
fmt.Printf("Starting HTTPS server on %s\n", httpsSrv.Addr)
err := httpsSrv.ListenAndServeTLS(chain, key)
if err != nil {
log.Fatalf("httpsSrv.ListendAndServeTLS() failed with %s", err)
}
}(serverConfig.HttpsChain, serverConfig.HttpsKey)
}
var httpSrv *http.Server
if mode.AnyOf(environment.Production, environment.Staging) {
httpSrv = makeHTTPToHTTPSRedirectServer(server)
} else {
httpSrv = makeHTTPServer(server)
}
if certManager != nil {
httpSrv.Handler = certManager.HTTPHandler(httpSrv.Handler)
}
httpSrv.Addr = ":" + strconv.Itoa(c.cfg.Coordinator.Server.Port)
err := httpSrv.ListenAndServe()
if err != nil {
log.Fatalf("httpSrv.ListenAndServe() failed with %s", err)
log.Fatalf("http init fail: %v", err)
}
services.Add(srv, httpSrv)
if conf.Coordinator.Monitoring.IsEnabled() {
services.Add(monitoring.New(conf.Coordinator.Monitoring, httpSrv.GetHost(), "cord"))
}
return
}

View file

@ -3,8 +3,6 @@ package coordinator
import (
"encoding/json"
"errors"
"fmt"
"html/template"
"log"
"math"
"net/http"
@ -16,14 +14,15 @@ import (
"github.com/giongto35/cloud-game/v2/pkg/environment"
"github.com/giongto35/cloud-game/v2/pkg/games"
"github.com/giongto35/cloud-game/v2/pkg/ice"
"github.com/giongto35/cloud-game/v2/pkg/service"
"github.com/giongto35/cloud-game/v2/pkg/util"
"github.com/gofrs/uuid"
"github.com/gorilla/websocket"
)
const index = "./web/index.html"
type Server struct {
service.Service
cfg coordinator.Config
// games library
library games.GameLibrary
@ -35,12 +34,12 @@ type Server struct {
browserClients map[string]*BrowserClient
}
const pingServerTemp = "https://%s.%s/echo"
const devPingServer = "http://localhost:9000/echo"
var upgrader = websocket.Upgrader{}
func NewServer(cfg coordinator.Config, library games.GameLibrary) *Server {
// scan the lib right away
library.Scan()
return &Server{
cfg: cfg,
library: library,
@ -53,39 +52,20 @@ func NewServer(cfg coordinator.Config, library games.GameLibrary) *Server {
}
}
// GetWeb returns web frontend
func (s *Server) GetWeb(conf coordinator.Config) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tpl, err := template.ParseFiles(index)
if err != nil {
log.Fatal(err)
}
// render index page with some tpl values
if err = tpl.Execute(w, conf.Coordinator.Analytics); err != nil {
log.Fatal(err)
}
})
}
// getPingServer returns the server for latency check of a zone.
// In latency check to find best worker step, we use this server to find the closest worker.
func (s *Server) getPingServer(zone string) string {
if s.cfg.Coordinator.PingServer != "" {
return fmt.Sprintf("%s/echo", s.cfg.Coordinator.PingServer)
}
mode := s.cfg.Environment.Get()
if mode.AnyOf(environment.Production, environment.Staging) {
return fmt.Sprintf(pingServerTemp, zone, s.cfg.Coordinator.PublicDomain)
}
return devPingServer
}
// WSO handles all connections from a new worker to coordinator
func (s *Server) WSO(w http.ResponseWriter, r *http.Request) {
log.Println("Coordinator: A worker is connecting...")
connRt, err := GetConnectionRequest(r.URL.Query().Get("data"))
if err != nil {
log.Printf("Coordinator: got a malformed request: %v", err.Error())
return
}
if s.cfg.Coordinator.Server.Https && !connRt.IsHTTPS {
log.Printf("Warning! Unsecure connection. The worker may not work properly without HTTPS on its side!")
}
// be aware of ReadBufferSize, WriteBufferSize (default 4096)
// https://pkg.go.dev/github.com/gorilla/websocket?tab=doc#Upgrader
c, err := upgrader.Upgrade(w, r, nil)
@ -107,20 +87,17 @@ func (s *Server) WSO(w http.ResponseWriter, r *http.Request) {
// Create a workerClient instance
wc := NewWorkerClient(c, workerID)
wc.Println("Generated worker ID")
wc.Zone = connRt.Zone
wc.PingServer = connRt.PingAddr
// Register to workersClients map the client connection
address := util.GetRemoteAddress(c)
wc.Println("Address:", address)
// Zone of the worker
zone := r.URL.Query().Get("zone")
wc.Printf("Is public: %v zone: %v", util.IsPublicIP(address), zone)
public := util.IsPublicIP(address)
pingServer := s.getPingServer(zone)
wc.Printf("Set ping server address: %s", pingServer)
wc.Printf("addr: %v | zone: %v | pub: %v | ping: %v", address, wc.Zone, public, wc.PingServer)
// In case worker and coordinator in the same host
if !util.IsPublicIP(address) && s.cfg.Environment.Get() == environment.Production {
if !public && s.cfg.Environment.Get() == environment.Production {
// Don't accept private IP for worker's address in prod mode
// However, if the worker in the same host with coordinator, we can get public IP of worker
wc.Printf("[!] Address %s is invalid", address)
@ -138,8 +115,6 @@ func (s *Server) WSO(w http.ResponseWriter, r *http.Request) {
// Create a workerClient instance
wc.Address = address
wc.StunTurnServer = ice.ToJson(s.cfg.Webrtc.IceServers, ice.Replacement{From: "server-ip", To: address})
wc.Zone = zone
wc.PingServer = pingServer
// Attach to Server instance with workerID, add defer
s.workerClients[workerID] = wc

47
pkg/coordinator/http.go Normal file
View file

@ -0,0 +1,47 @@
package coordinator
import (
"html/template"
"log"
"net/http"
"github.com/giongto35/cloud-game/v2/pkg/config/coordinator"
"github.com/giongto35/cloud-game/v2/pkg/network/httpx"
)
func NewHTTPServer(conf coordinator.Config, fnMux func(mux *http.ServeMux)) (*httpx.Server, error) {
return httpx.NewServer(
conf.Coordinator.Server.GetAddr(),
func(*httpx.Server) http.Handler {
h := http.NewServeMux()
h.Handle("/", index(conf))
h.Handle("/static/", static("./web"))
fnMux(h)
return h
},
httpx.WithServerConfig(conf.Coordinator.Server),
)
}
func index(conf coordinator.Config) http.Handler {
tpl, err := template.ParseFiles("./web/index.html")
if err != nil {
log.Fatal(err)
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// return 404 on unknown
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
// render index page with some tpl values
if err = tpl.Execute(w, conf.Coordinator.Analytics); err != nil {
log.Fatal(err)
}
})
}
func static(dir string) http.Handler {
return http.StripPrefix("/static/", http.FileServer(http.Dir(dir)))
}

View file

@ -1,27 +1,33 @@
package coordinator
import (
"encoding/base64"
"encoding/json"
"log"
"github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/giongto35/cloud-game/v2/pkg/cws"
"github.com/giongto35/cloud-game/v2/pkg/cws/api"
)
func (wc *WorkerClient) handleConfigRequest() cws.PacketHandler {
return func(resp cws.WSPacket) cws.WSPacket {
// try to load worker config
conf := worker.NewConfig()
return api.ConfigRequestPacket(conf.Serialize())
}
}
func (wc *WorkerClient) handleHeartbeat() cws.PacketHandler {
return func(resp cws.WSPacket) cws.WSPacket {
return resp
}
}
func GetConnectionRequest(data string) (api.ConnectionRequest, error) {
req := api.ConnectionRequest{}
if data == "" {
return req, nil
}
decodeString, err := base64.URLEncoding.DecodeString(data)
if err != nil {
return req, err
}
err = json.Unmarshal(decodeString, &req)
return req, err
}
// handleRegisterRoom event from a worker, when worker created a new room.
// RoomID is global so it is managed by coordinator.
func (wc *WorkerClient) handleRegisterRoom(s *Server) cws.PacketHandler {

View file

@ -7,7 +7,6 @@ func (s *Server) workerRoutes(wc *WorkerClient) {
if wc == nil {
return
}
wc.Receive(api.ConfigRequest, wc.handleConfigRequest())
wc.Receive(api.Heartbeat, wc.handleHeartbeat())
wc.Receive(api.RegisterRoom, wc.handleRegisterRoom(s))
wc.Receive(api.GetRoom, wc.handleGetRoom(s))

View file

@ -14,7 +14,7 @@ type WorkerClient struct {
WorkerID string
Address string // ip address of worker
// public server used for ping check (Cannot use worker address because they are not publicly exposed)
// public server used for ping check
PingServer string
StunTurnServer string
userCount int // may be atomic

View file

@ -3,12 +3,11 @@ package api
import "github.com/giongto35/cloud-game/v2/pkg/cws"
const (
ConfigRequest = "config_request"
GetRoom = "get_room"
CloseRoom = "close_room"
RegisterRoom = "register_room"
Heartbeat = "heartbeat"
IceCandidate = "ice_candidate"
GetRoom = "get_room"
CloseRoom = "close_room"
RegisterRoom = "register_room"
Heartbeat = "heartbeat"
IceCandidate = "ice_candidate"
NoData = ""
@ -38,10 +37,14 @@ type GameStartCall struct {
func (packet *GameStartCall) From(data string) error { return from(packet, data) }
func (packet *GameStartCall) To() (string, error) { return to(packet) }
//
// *** packets ***
//
func ConfigPacket() cws.WSPacket { return cws.WSPacket{ID: ConfigRequest} }
type ConnectionRequest struct {
Zone string `json:"zone,omitempty"`
PingAddr string `json:"ping_addr,omitempty"`
IsHTTPS bool `json:"is_https,omitempty"`
}
// packets
func RegisterRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: RegisterRoom, Data: data} }
func GetRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: GetRoom, Data: data} }
func CloseRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: CloseRoom, Data: data} }

View file

@ -16,7 +16,7 @@ import (
// Config is an external configuration
type Config struct {
// some directory which is gonna be
// some directory which is going to be
// the root folder for the library
BasePath string
// a list of supported file extensions
@ -64,6 +64,10 @@ type GameLibrary interface {
Scan()
}
type FileExtensionWhitelist interface {
GetSupportedExtensions() []string
}
type GameMetadata struct {
uid string
// the display name of the game
@ -74,7 +78,11 @@ type GameMetadata struct {
Path string
}
func NewLibrary(conf Config) GameLibrary {
func (c Config) GetSupportedExtensions() []string { return c.Supported }
func NewLib(conf Config) GameLibrary { return NewLibWhitelisted(conf, conf) }
func NewLibWhitelisted(conf Config, filter FileExtensionWhitelist) GameLibrary {
hasSource := true
dir, err := filepath.Abs(conf.BasePath)
if err != nil {
@ -82,6 +90,10 @@ func NewLibrary(conf Config) GameLibrary {
log.Printf("[lib] invalid source: %v (%v)\n", conf.BasePath, err)
}
if len(conf.Supported) == 0 {
conf.Supported = filter.GetSupportedExtensions()
}
library := &library{
config: libConf{
path: dir,

View file

@ -19,7 +19,7 @@ func TestLibraryScan(t *testing.T) {
}
for _, test := range tests {
library := NewLibrary(Config{
library := NewLib(Config{
BasePath: test.directory,
Supported: []string{"gba", "zip", "nes"},
Ignored: []string{"neogeo", "pgm"},

View file

@ -3,104 +3,109 @@ package monitoring
import (
"context"
"fmt"
"log"
"math"
"net"
"net/http"
"net/http/pprof"
"strconv"
"strings"
"github.com/giongto35/cloud-game/v2/pkg/config/monitoring"
config "github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/golang/glog"
"github.com/giongto35/cloud-game/v2/pkg/network/httpx"
"github.com/giongto35/cloud-game/v2/pkg/service"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type ServerMonitoring struct {
cfg monitoring.ServerMonitoringConfig
const debugEndpoint = "/debug/pprof"
const metricsEndpoint = "/metrics"
type Monitoring struct {
service.RunnableService
conf monitoring.Config
tag string
server *http.Server
server *httpx.Server
}
func NewServerMonitoring(cfg monitoring.ServerMonitoringConfig, tag string) *ServerMonitoring {
return &ServerMonitoring{cfg: validate(&cfg), tag: tag}
}
func (sm *ServerMonitoring) Init(conf interface{}) error {
cfg := conf.(config.Config).Worker.Monitoring
sm.cfg = validate(&cfg)
return nil
}
func (sm *ServerMonitoring) Run() error {
if sm.cfg.ProfilingEnabled || sm.cfg.MetricEnabled {
monitoringServerMux := http.NewServeMux()
srv := http.Server{
Addr: fmt.Sprintf(":%d", sm.cfg.Port),
Handler: monitoringServerMux,
}
sm.server = &srv
glog.Infof("[%v] Starting monitoring server at %v", sm.tag, srv.Addr)
if sm.cfg.ProfilingEnabled {
pprofPath := fmt.Sprintf("%s/debug/pprof", sm.cfg.URLPrefix)
glog.Infof("[%v] Profiling is enabled at %v", sm.tag, srv.Addr+pprofPath)
monitoringServerMux.Handle(pprofPath+"/", http.HandlerFunc(pprof.Index))
monitoringServerMux.Handle(pprofPath+"/cmdline", http.HandlerFunc(pprof.Cmdline))
monitoringServerMux.Handle(pprofPath+"/profile", http.HandlerFunc(pprof.Profile))
monitoringServerMux.Handle(pprofPath+"/symbol", http.HandlerFunc(pprof.Symbol))
monitoringServerMux.Handle(pprofPath+"/trace", http.HandlerFunc(pprof.Trace))
// pprof handler for custom pprof path needs to be explicitly specified, according to: https://github.com/gin-contrib/pprof/issues/8 . Don't know why this is not fired as ticket
// https://golang.org/src/net/http/pprof/pprof.go?s=7411:7461#L305 only render index page
monitoringServerMux.Handle(pprofPath+"/allocs", pprof.Handler("allocs"))
monitoringServerMux.Handle(pprofPath+"/block", pprof.Handler("block"))
monitoringServerMux.Handle(pprofPath+"/goroutine", pprof.Handler("goroutine"))
monitoringServerMux.Handle(pprofPath+"/heap", pprof.Handler("heap"))
monitoringServerMux.Handle(pprofPath+"/mutex", pprof.Handler("mutex"))
monitoringServerMux.Handle(pprofPath+"/threadcreate", pprof.Handler("threadcreate"))
}
if sm.cfg.MetricEnabled {
metricPath := fmt.Sprintf("%s/metrics", sm.cfg.URLPrefix)
glog.Infof("[%v] Prometheus metric is enabled at %v", sm.tag, srv.Addr+metricPath)
monitoringServerMux.Handle(metricPath, promhttp.Handler())
}
err := srv.ListenAndServe()
if err == http.ErrServerClosed {
glog.Infof("[%v] The main HTTP server has been closed", sm.tag)
return nil
}
return err
// New creates new monitoring service.
// The tag param specifies owner label for logs.
func New(conf monitoring.Config, baseAddr string, tag string) *Monitoring {
serv, err := httpx.NewServer(
net.JoinHostPort(baseAddr, strconv.Itoa(conf.Port)),
func(*httpx.Server) http.Handler {
h := http.NewServeMux()
if conf.ProfilingEnabled {
prefix := conf.URLPrefix + debugEndpoint
h.HandleFunc(prefix+"/", pprof.Index)
h.HandleFunc(prefix+"/cmdline", pprof.Cmdline)
h.HandleFunc(prefix+"/profile", pprof.Profile)
h.HandleFunc(prefix+"/symbol", pprof.Symbol)
h.HandleFunc(prefix+"/trace", pprof.Trace)
h.Handle(prefix+"/allocs", pprof.Handler("allocs"))
h.Handle(prefix+"/block", pprof.Handler("block"))
h.Handle(prefix+"/goroutine", pprof.Handler("goroutine"))
h.Handle(prefix+"/heap", pprof.Handler("heap"))
h.Handle(prefix+"/mutex", pprof.Handler("mutex"))
h.Handle(prefix+"/threadcreate", pprof.Handler("threadcreate"))
}
if conf.MetricEnabled {
h.Handle(conf.URLPrefix+metricsEndpoint, promhttp.Handler())
}
return h
},
httpx.WithPortRoll(true))
if err != nil {
log.Fatalf("couldn't start monitoring server: %v", err)
}
return nil
return &Monitoring{conf: conf, tag: tag, server: serv}
}
func (sm *ServerMonitoring) Shutdown(ctx context.Context) error {
if sm.server == nil {
return nil
func (m *Monitoring) Run() {
m.printInfo()
m.server.Run()
}
func (m *Monitoring) Shutdown(ctx context.Context) error {
log.Printf("[%v] Shutting down monitoring server", m.tag)
return m.server.Shutdown(ctx)
}
func (m *Monitoring) String() string {
return fmt.Sprintf("monitoring::%s:%d", m.conf.URLPrefix, m.conf.Port)
}
func (m *Monitoring) GetMetricsPublicAddress() string {
return m.server.GetProtocol() + "://" + m.server.Addr + m.conf.URLPrefix + metricsEndpoint
}
func (m *Monitoring) GetProfilingAddress() string {
return m.server.GetProtocol() + "://" + m.server.Addr + m.conf.URLPrefix + debugEndpoint
}
func (m *Monitoring) printInfo() {
length, pad := 42, 20
var table, records strings.Builder
table.Grow(length * 4)
records.Grow(length * 2)
if m.conf.ProfilingEnabled {
addr := m.GetProfilingAddress()
length = int(math.Max(float64(length), float64(len(addr)+pad)))
records.WriteString(" Profiling " + addr + "\n")
}
if m.conf.MetricEnabled {
addr := m.GetMetricsPublicAddress()
length = int(math.Max(float64(length), float64(len(addr)+pad)))
records.WriteString(" Prometheus " + addr + "\n")
}
glog.Infof("[%v] Shutting down monitoring server", sm.tag)
return sm.server.Shutdown(ctx)
}
func (sm *ServerMonitoring) String() string {
return fmt.Sprintf("monitoring::%s:%d", sm.cfg.URLPrefix, sm.cfg.Port)
}
func validate(conf *monitoring.ServerMonitoringConfig) monitoring.ServerMonitoringConfig {
if conf.Port == 0 {
conf.Port = 6365
}
if len(conf.URLPrefix) > 0 {
conf.URLPrefix = strings.TrimSpace(conf.URLPrefix)
if !strings.HasPrefix(conf.URLPrefix, "/") {
conf.URLPrefix = "/" + conf.URLPrefix
}
if strings.HasSuffix(conf.URLPrefix, "/") {
conf.URLPrefix = strings.TrimSuffix(conf.URLPrefix, "/")
}
}
return *conf
title := "Monitoring"
edge := strings.Repeat("-", length)
c := (length-len(title)-3)/2 + 1 + len(title) - 3
table.WriteString(fmt.Sprintf("[%s]\n", m.tag))
table.WriteString(fmt.Sprintf("%s\n---%*s%*s\n%s\n", edge, c, title, length-(c+len(title))+6+1, "---", edge))
table.WriteString(records.String())
table.WriteString(edge)
log.Printf(table.String())
}

View file

@ -0,0 +1,40 @@
package httpx
import (
"net"
"strconv"
)
// buildAddress joins network host from the first param,
// zone from the second, and
// the port value of a listener from the third param.
//
// As example, address host.com:8080 and listener 123.123.123.123:8888 will be
// transformed to host.com:8888.
func buildAddress(address string, zone string, l Listener) string {
addr, _, err := net.SplitHostPort(address)
if err != nil {
addr = address
}
if addr == "" {
addr = "localhost"
}
port := l.GetPort()
if port > 0 && port != 80 && port != 443 {
addr += ":" + strconv.Itoa(port)
}
if zone != "" {
addr = zone + "." + addr
}
return addr
}
func extractHost(address string) string {
addr, _, err := net.SplitHostPort(address)
if err != nil {
addr = address
}
return addr
}

View file

@ -0,0 +1,47 @@
package httpx
import (
"net"
"testing"
)
type testListener struct {
addr net.TCPAddr
}
func (tl testListener) Accept() (net.Conn, error) { return nil, nil }
func (tl testListener) Close() error { return nil }
func (tl testListener) Addr() net.Addr { return &tl.addr }
func NewTCP(port int) Listener {
return Listener{testListener{addr: net.TCPAddr{Port: port}}}
}
func TestMergeAddresses(t *testing.T) {
tests := []struct {
addr string
zone string
ls Listener
rez string
}{
{addr: "", rez: "localhost"},
{addr: ":", ls: NewTCP(0), rez: "localhost"},
{addr: "", ls: NewTCP(393), rez: "localhost:393"},
{addr: ":8080", ls: NewTCP(8080), rez: "localhost:8080"},
{addr: ":8080", ls: NewTCP(8081), rez: "localhost:8081"},
{addr: "host:8080", ls: NewTCP(8080), rez: "host:8080"},
{addr: "host:8080", ls: NewTCP(8081), rez: "host:8081"},
{addr: "host:8080", zone: "test", ls: NewTCP(8081), rez: "test.host:8081"},
{addr: ":80", ls: NewTCP(80), rez: "localhost"},
{addr: ":", ls: NewTCP(344), rez: "localhost:344"},
{addr: "https://garbage.com:99a9a", rez: "https://garbage.com:99a9a"},
{addr: "[::]", rez: "[::]"},
}
for _, test := range tests {
address := buildAddress(test.addr, test.zone, test.ls)
if address != test.rez {
t.Errorf("expected %v, got %v", test.rez, address)
}
}
}

View file

@ -0,0 +1,79 @@
package httpx
import (
"errors"
"net"
"os"
"runtime"
"strconv"
"syscall"
)
const listenAttempts = 42
type Listener struct {
net.Listener
}
func NewListener(address string, withNextFreePort bool) (*Listener, error) {
listener, err := listener(address, withNextFreePort)
if err != nil {
return nil, err
}
return &Listener{listener}, err
}
func listener(address string, withNextFreePort bool) (net.Listener, error) {
listener, err := net.Listen("tcp", address)
if err == nil || !withNextFreePort || !isPortBusyError(err) {
return listener, err
}
// we will roll next available port
host, prt, err := net.SplitHostPort(address)
if err != nil {
return listener, err
}
// it should be impossible to get 0 port here
// or that's going to break otherwise
port, err := strconv.Atoi(prt)
if err != nil {
return listener, err
}
for i := port + 1; i < port+listenAttempts; i++ {
listener, err := net.Listen("tcp", host+":"+strconv.Itoa(i))
if err == nil {
return listener, nil
}
}
return nil, errors.New("no available ports")
}
func (l Listener) GetPort() int {
if l.Listener == nil {
return 0
}
tcp, ok := l.Addr().(*net.TCPAddr)
if ok && tcp != nil {
return tcp.Port
}
return 0
}
func isPortBusyError(err error) bool {
var eOsSyscall *os.SyscallError
if !errors.As(err, &eOsSyscall) {
return false
}
var errErrno syscall.Errno
if !errors.As(eOsSyscall, &errErrno) {
return false
}
if errErrno == syscall.EADDRINUSE {
return true
}
const WSAEADDRINUSE = 10048
if runtime.GOOS == "windows" && errErrno == WSAEADDRINUSE {
return true
}
return false
}

View file

@ -0,0 +1,85 @@
package httpx
import (
"net"
"strings"
"testing"
)
func TestListenerCreation(t *testing.T) {
tests := []struct {
addr string
port string
random bool
error bool
}{
{addr: ":80", port: "80"},
{addr: ":", random: true},
{addr: ":0", random: true},
{addr: "", random: true},
{addr: "https://garbage.com:99a9a", error: true},
{addr: ":8082", port: "8082"},
{addr: "localhost:8888", port: "8888"},
{addr: "localhost:abc1", error: true},
}
for _, test := range tests {
ls, err := NewListener(test.addr, false)
if test.error {
if err == nil {
t.Errorf("expected error, but got none")
}
continue
}
if !test.error && err != nil {
t.Errorf("unexpected error %v", err)
continue
}
defer ls.Close()
addr := ls.Addr().(*net.TCPAddr)
port := ls.GetPort()
hasPort := port > 0
isPortSame := strings.HasSuffix(addr.String(), ":"+test.port)
if test.random {
if !hasPort {
t.Errorf("expected a random port, got %v", port)
}
continue
}
if !isPortSame {
t.Errorf("expected the same port %v != %v", test.port, port)
}
}
}
func TestFailOnPortInUse(t *testing.T) {
a, err := NewListener(":3333", false)
if err != nil {
t.Errorf("expected no error, got %v", err)
}
defer a.Close()
_, err = NewListener(":3333", false)
if err == nil {
t.Errorf("expected busy port error, but got none")
}
}
func TestListenerPortRoll(t *testing.T) {
a, err := NewListener("127.0.0.1:3333", false)
if err != nil {
t.Errorf("expected no error, got %v", err)
}
defer a.Close()
b, err := NewListener("127.0.0.1:3333", true)
if err != nil {
t.Errorf("expected no port error, but got %v", err)
}
b.Close()
}

View file

@ -0,0 +1,56 @@
package httpx
import (
"time"
"github.com/giongto35/cloud-game/v2/pkg/config/shared"
)
type (
Options struct {
Https bool
HttpsRedirect bool
HttpsRedirectAddress string
HttpsCert string
HttpsKey string
HttpsDomain string
PortRoll bool
IdleTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Zone string
}
Option func(*Options)
)
func (o *Options) override(options ...Option) {
for _, opt := range options {
opt(o)
}
}
func (o *Options) IsAutoHttpsCert() bool { return !(o.HttpsCert != "" && o.HttpsKey != "") }
func HttpsRedirect(redirect bool) Option {
return func(opts *Options) { opts.HttpsRedirect = redirect }
}
//func Https(is bool) Option { return func(opts *Options) { opts.Https = is } }
//func HttpsCert(cert string) Option { return func(opts *Options) { opts.HttpsCert = cert } }
//func HttpsKey(key string) Option { return func(opts *Options) { opts.HttpsKey = key } }
//func HttpsDomain(domain string) Option { return func(opts *Options) { opts.HttpsDomain = domain } }
//func IdleTimeout(t time.Duration) Option { return func(opts *Options) { opts.IdleTimeout = t } }
//func ReadTimeout(t time.Duration) Option { return func(opts *Options) { opts.ReadTimeout = t } }
//func WriteTimeout(t time.Duration) Option { return func(opts *Options) { opts.WriteTimeout = t } }
func WithPortRoll(roll bool) Option { return func(opts *Options) { opts.PortRoll = roll } }
func WithZone(zone string) Option { return func(opts *Options) { opts.Zone = zone } }
func WithServerConfig(conf shared.Server) Option {
return func(opts *Options) {
opts.Https = conf.Https
opts.HttpsCert = conf.Tls.HttpsCert
opts.HttpsKey = conf.Tls.HttpsKey
opts.HttpsDomain = conf.Tls.Domain
opts.HttpsRedirectAddress = conf.Address
}
}

136
pkg/network/httpx/server.go Normal file
View file

@ -0,0 +1,136 @@
package httpx
import (
"context"
"log"
"net/http"
"net/url"
"time"
"github.com/giongto35/cloud-game/v2/pkg/service"
"golang.org/x/crypto/acme/autocert"
)
type Server struct {
http.Server
service.RunnableService
autoCert *autocert.Manager
opts Options
listener *Listener
redirect *Server
}
func NewServer(address string, handler func(*Server) http.Handler, options ...Option) (*Server, error) {
opts := &Options{
Https: false,
HttpsRedirect: true,
IdleTimeout: 120 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
}
opts.override(options...)
server := &Server{
Server: http.Server{
Addr: address,
IdleTimeout: opts.IdleTimeout,
ReadTimeout: opts.ReadTimeout,
WriteTimeout: opts.WriteTimeout,
},
opts: *opts,
}
// (╯°□°)╯︵ ┻━┻
server.Handler = handler(server)
if opts.Https && opts.IsAutoHttpsCert() {
server.autoCert = NewTLSConfig(opts.HttpsDomain).CertManager
server.TLSConfig = server.autoCert.TLSConfig()
}
addr := server.Addr
if server.Addr == "" {
addr = ":http"
if opts.Https {
addr = ":https"
}
log.Printf("Warning! Empty server address has been changed to %v", server.Addr)
}
listener, err := NewListener(addr, server.opts.PortRoll)
if err != nil {
return nil, err
}
server.listener = listener
addr = buildAddress(server.Addr, opts.Zone, *listener)
log.Printf("[server] address was set to %v (%v)", addr, server.Addr)
server.Addr = addr
return server, nil
}
func (s *Server) Run() {
protocol := s.GetProtocol()
log.Printf("Starting %s server on %s", protocol, s.Addr)
if s.opts.Https && s.opts.HttpsRedirect {
rdr, err := s.redirection()
if err != nil {
log.Fatalf("couldn't init redirection server: %v", err)
}
s.redirect = rdr
go s.redirect.Run()
}
var err error
if s.opts.Https {
err = s.ServeTLS(*s.listener, s.opts.HttpsCert, s.opts.HttpsKey)
} else {
err = s.Serve(*s.listener)
}
switch err {
case http.ErrServerClosed:
log.Printf("%s server was closed", protocol)
return
default:
log.Printf("error: %s", err)
}
}
func (s *Server) Shutdown(ctx context.Context) (err error) {
if s.redirect != nil {
err = s.redirect.Shutdown(ctx)
}
err = s.Server.Shutdown(ctx)
return
}
func (s *Server) GetHost() string { return extractHost(s.Addr) }
func (s *Server) GetProtocol() string {
protocol := "http"
if s.opts.Https {
protocol = "https"
}
return protocol
}
func (s *Server) redirection() (*Server, error) {
srv, err := NewServer(s.opts.HttpsRedirectAddress, func(serv *Server) http.Handler {
h := http.NewServeMux()
h.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
httpsURL := url.URL{Scheme: "https", Host: s.Addr, Path: r.URL.Path, RawQuery: r.URL.RawQuery}
rdr := httpsURL.String()
log.Printf("Redirect: http://%s%s -> %s", r.Host, r.URL.String(), rdr)
http.Redirect(w, r, rdr, http.StatusFound)
}))
// do we need this after all?
if serv.autoCert != nil {
return serv.autoCert.HTTPHandler(h)
}
return h
})
log.Printf("Starting HTTP->HTTPS redirection server on %s", srv.Addr)
return srv, err
}

21
pkg/network/httpx/tls.go Normal file
View file

@ -0,0 +1,21 @@
package httpx
import (
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
)
type TLS struct {
CertManager *autocert.Manager
}
func NewTLSConfig(domain string) *TLS {
return &TLS{
CertManager: &autocert.Manager{
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist(domain),
Cache: autocert.DirCache("assets/cache"),
Client: &acme.Client{DirectoryURL: acme.LetsEncryptURL},
},
}
}

View file

@ -0,0 +1,17 @@
package websocket
import (
"crypto/tls"
"net/url"
"github.com/gorilla/websocket"
)
func Connect(address url.URL) (*websocket.Conn, error) {
dialer := websocket.Dialer{}
if address.Scheme == "wss" {
dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
ws, _, err := dialer.Dial(address.String(), nil)
return ws, err
}

23
pkg/os/os.go Normal file
View file

@ -0,0 +1,23 @@
package os
import (
"os"
"os/signal"
"syscall"
)
type Signal struct {
event chan os.Signal
done chan struct{}
}
func ExpectTermination() chan struct{} {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
done := make(chan struct{}, 1)
go func() {
<-signals
done <- struct{}{}
}()
return done
}

View file

@ -1,9 +0,0 @@
package server
import "context"
type Server interface {
Init(conf interface{}) error
Run() error
Shutdown(ctx context.Context) error
}

48
pkg/service/service.go Normal file
View file

@ -0,0 +1,48 @@
package service
import (
"context"
"log"
)
// Service defines a generic service.
type Service interface{}
// RunnableService defines a service that can be run.
type RunnableService interface {
Service
Run()
Shutdown(ctx context.Context) error
}
// Group is a container for managing a bunch of services.
type Group struct {
list []Service
}
func (g *Group) Add(services ...Service) {
for _, s := range services {
g.list = append(g.list, s)
}
}
// Start starts each service in the group.
func (g *Group) Start() {
for _, s := range g.list {
if v, ok := s.(RunnableService); ok {
go v.Run()
}
}
}
// Shutdown terminates a group of services.
func (g *Group) Shutdown(ctx context.Context) {
for _, s := range g.list {
if v, ok := s.(RunnableService); ok {
if err := v.Shutdown(ctx); err != nil && err != context.Canceled {
log.Printf("error: failed to stop [%s] because of %v", s, err)
}
}
}
}

View file

@ -1,7 +1,9 @@
package worker
import (
"crypto/tls"
"context"
"encoding/base64"
"encoding/json"
"log"
"net/url"
"os"
@ -10,20 +12,21 @@ import (
"github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/giongto35/cloud-game/v2/pkg/cws/api"
"github.com/giongto35/cloud-game/v2/pkg/emulator/libretro/manager/remotehttp"
"github.com/giongto35/cloud-game/v2/pkg/environment"
"github.com/giongto35/cloud-game/v2/pkg/games"
"github.com/giongto35/cloud-game/v2/pkg/network/websocket"
"github.com/giongto35/cloud-game/v2/pkg/service"
"github.com/giongto35/cloud-game/v2/pkg/webrtc"
storage "github.com/giongto35/cloud-game/v2/pkg/worker/cloud-storage"
"github.com/giongto35/cloud-game/v2/pkg/worker/room"
"github.com/gorilla/websocket"
)
type Handler struct {
service.RunnableService
address string
// Client that connects to coordinator
oClient *CoordinatorClient
// Raw address of coordinator
coordinatorHost string
cfg worker.Config
cfg worker.Config
// Rooms map : RoomID -> Room
rooms map[string]*room.Room
// global ID of the current server
@ -32,38 +35,34 @@ type Handler struct {
onlineStorage *storage.Client
// sessions handles all sessions server is handler (key is sessionID)
sessions map[string]*Session
w *Worker
}
// NewHandler returns a new server
func NewHandler(cfg worker.Config, wrk *Worker) *Handler {
func NewHandler(conf worker.Config, address string) *Handler {
// Create offline storage folder
createOfflineStorage(cfg.Emulator.Storage)
createOfflineStorage(conf.Emulator.Storage)
// Init online storage
onlineStorage := storage.NewInitClient()
return &Handler{
rooms: map[string]*room.Room{},
sessions: map[string]*Session{},
coordinatorHost: cfg.Worker.Network.CoordinatorAddress,
cfg: cfg,
onlineStorage: onlineStorage,
w: wrk,
address: address,
cfg: conf,
onlineStorage: onlineStorage,
rooms: map[string]*room.Room{},
sessions: map[string]*Session{},
}
}
// Run starts a Handler running logic
func (h *Handler) Run() {
conf := h.cfg.Worker.Network
coordinatorAddress := h.cfg.Worker.Network.CoordinatorAddress
for {
conn, err := setupCoordinatorConnection(conf.CoordinatorAddress, conf.Zone, h.cfg)
conn, err := newCoordinatorConnection(coordinatorAddress, h.cfg.Worker, h.address)
if err != nil {
log.Printf("Cannot connect to coordinator. %v Retrying...", err)
time.Sleep(time.Second)
continue
}
log.Printf("[worker] connected to: %v", conf.CoordinatorAddress)
log.Printf("[worker] connected to: %v", coordinatorAddress)
h.oClient = conn
go h.oClient.Heartbeat()
@ -73,13 +72,7 @@ func (h *Handler) Run() {
}
}
func (h *Handler) RequestConfig() {
log.Printf("[worker] asking for a config...")
response := h.oClient.SyncSend(api.ConfigPacket())
conf := worker.EmptyConfig()
conf.Deserialize([]byte(response.Data))
log.Printf("[worker] pulled config: %+v", conf)
}
func (h *Handler) Shutdown(context.Context) error { return nil }
func (h *Handler) Prepare() {
if !h.cfg.Emulator.Libretro.Cores.Repo.Sync {
@ -99,39 +92,36 @@ func (h *Handler) Prepare() {
}
}
func setupCoordinatorConnection(host string, zone string, cfg worker.Config) (*CoordinatorClient, error) {
var scheme string
env := cfg.Environment.Get()
if env.AnyOf(environment.Production, environment.Staging) {
func newCoordinatorConnection(host string, conf worker.Worker, addr string) (*CoordinatorClient, error) {
scheme := "ws"
if conf.Network.Secure {
scheme = "wss"
} else {
scheme = "ws"
}
address := url.URL{Scheme: scheme, Host: host, Path: conf.Network.Endpoint}
req, err := MakeConnectionRequest(conf, addr)
if req != "" && err == nil {
address.RawQuery = "data=" + req
}
coordinatorURL := url.URL{Scheme: scheme, Host: host, Path: "/wso", RawQuery: "zone=" + zone}
log.Println("Worker connecting to coordinator:", coordinatorURL.String())
conn, err := createCoordinatorConnection(&coordinatorURL)
conn, err := websocket.Connect(address)
if err != nil {
return nil, err
}
return NewCoordinatorClient(conn), nil
}
func createCoordinatorConnection(url *url.URL) (*websocket.Conn, error) {
var d websocket.Dialer
if url.Scheme == "wss" {
d = websocket.Dialer{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
} else {
d = websocket.Dialer{}
func MakeConnectionRequest(conf worker.Worker, address string) (string, error) {
req := api.ConnectionRequest{
Zone: conf.Network.Zone,
PingAddr: conf.GetPingAddr(address),
IsHTTPS: conf.Server.Https,
}
ws, _, err := d.Dial(url.String(), nil)
rez, err := json.Marshal(req)
if err != nil {
return nil, err
return "", err
}
return ws, nil
return base64.URLEncoding.EncodeToString(rez), nil
}
func (h *Handler) GetCoordinatorClient() *CoordinatorClient {

31
pkg/worker/http.go Normal file
View file

@ -0,0 +1,31 @@
package worker
import (
"net/http"
"github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/giongto35/cloud-game/v2/pkg/network/httpx"
)
func NewHTTPServer(conf worker.Config) (*httpx.Server, error) {
srv, err := httpx.NewServer(
conf.Worker.GetAddr(),
func(*httpx.Server) http.Handler {
h := http.NewServeMux()
h.HandleFunc(conf.Worker.Network.PingEndpoint, func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
_, _ = w.Write([]byte{0x65, 0x63, 0x68, 0x6f}) // echo
})
return h
},
httpx.WithServerConfig(conf.Worker.Server),
// no need just for one route
httpx.HttpsRedirect(false),
httpx.WithPortRoll(true),
httpx.WithZone(conf.Worker.Network.Zone),
)
if err != nil {
return nil, err
}
return srv, nil
}

View file

@ -16,8 +16,6 @@ func (h *Handler) handleServerId() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Printf("[worker] new id: %s", resp.Data)
h.serverID = resp.Data
// unlock worker if it's locked
h.w.lock.Unlock()
return
}
}

View file

@ -1,123 +0,0 @@
package worker
import (
"crypto/tls"
"fmt"
"log"
"net/http"
"strconv"
"time"
"github.com/giongto35/cloud-game/v2/pkg/environment"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
)
const stagingLEURL = "https://acme-staging-v02.api.letsencrypt.org/directory"
var echo = []byte{0x65, 0x63, 0x68, 0x6f}
func makeServerFromMux(mux *http.ServeMux) *http.Server {
// set timeouts so that a slow or malicious client doesn't
// hold resources forever
return &http.Server{
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 120 * time.Second,
Handler: mux,
}
}
func makeHTTPServer() *http.Server {
mux := &http.ServeMux{}
mux.HandleFunc("/echo", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
_, _ = w.Write(echo)
})
return makeServerFromMux(mux)
}
func makeHTTPToHTTPSRedirectServer() *http.Server {
handleRedirect := func(w http.ResponseWriter, r *http.Request) {
newURI := "https://" + r.Host + r.URL.String()
http.Redirect(w, r, newURI, http.StatusFound)
}
mux := &http.ServeMux{}
mux.HandleFunc("/", handleRedirect)
return makeServerFromMux(mux)
}
func (wrk *Worker) spawnServer(port int) {
var certManager *autocert.Manager
var httpsSrv *http.Server
mode := wrk.conf.Environment.Get()
if mode.AnyOf(environment.Production, environment.Staging) {
serverConfig := wrk.conf.Worker.Server
httpsSrv = makeHTTPServer()
httpsSrv.Addr = fmt.Sprintf(":%d", serverConfig.HttpsPort)
if serverConfig.HttpsChain == "" || serverConfig.HttpsKey == "" {
serverConfig.HttpsChain = ""
serverConfig.HttpsKey = ""
var leurl string
if mode == environment.Staging {
leurl = stagingLEURL
} else {
leurl = acme.LetsEncryptURL
}
certManager = &autocert.Manager{
Prompt: autocert.AcceptTOS,
Cache: autocert.DirCache("assets/cache"),
Client: &acme.Client{DirectoryURL: leurl},
}
httpsSrv.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate}
}
go func(chain string, key string) {
log.Printf("Starting HTTPS server on %s\n", httpsSrv.Addr)
err := httpsSrv.ListenAndServeTLS(chain, key)
if err != nil {
log.Printf("httpsSrv.ListendAndServeTLS() failed with %s", err)
}
}(serverConfig.HttpsChain, serverConfig.HttpsKey)
}
var httpSrv *http.Server
if mode.AnyOf(environment.Production, environment.Staging) {
httpSrv = makeHTTPToHTTPSRedirectServer()
} else {
httpSrv = makeHTTPServer()
}
if certManager != nil {
httpSrv.Handler = certManager.HTTPHandler(httpSrv.Handler)
}
startServer(httpSrv, port)
}
func startServer(serv *http.Server, startPort int) {
// It's recommend to run one worker on one instance.
// This logic is to make sure more than 1 workers still work
for port, n := startPort, startPort+100; port < n; port++ {
serv.Addr = ":" + strconv.Itoa(port)
err := serv.ListenAndServe()
switch err {
case http.ErrServerClosed:
log.Printf("HTTP(S) server was closed")
return
default:
}
port++
if port == n {
log.Printf("error: couldn't find an open port in range %v-%v\n", startPort, port)
}
}
}

View file

@ -1,74 +1,24 @@
package worker
import (
"context"
"log"
"time"
"github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/giongto35/cloud-game/v2/pkg/lock"
"github.com/giongto35/cloud-game/v2/pkg/monitoring"
"github.com/giongto35/cloud-game/v2/pkg/server"
"github.com/golang/glog"
"github.com/giongto35/cloud-game/v2/pkg/service"
)
type Worker struct {
ctx context.Context
conf worker.Config
servers []server.Server
// to pause initialization
lock *lock.TimeLock
}
func New(ctx context.Context, conf worker.Config) *Worker {
return &Worker{ctx: ctx, conf: conf, lock: lock.NewLock()}
}
func (wrk *Worker) Run() {
go wrk.init()
wrk.servers = []server.Server{
monitoring.NewServerMonitoring(wrk.conf.Worker.Monitoring, "worker"),
func New(conf worker.Config) (services service.Group) {
httpSrv, err := NewHTTPServer(conf)
if err != nil {
log.Fatalf("http init fail: %v", err)
}
wrk.startModules()
}
func (wrk *Worker) init() {
h := NewHandler(wrk.conf, wrk)
defer func() {
log.Printf("[worker] Closing handler")
h.Close()
}()
go h.Run()
if !wrk.conf.Loaded {
wrk.lock.LockFor(time.Second * 10)
h.RequestConfig()
}
h.Prepare()
wrk.spawnServer(wrk.conf.Worker.Server.Port)
}
func (wrk *Worker) startModules() {
glog.Info("[worker] active modules: ", wrk.servers)
for _, s := range wrk.servers {
s := s
go func() {
if err := s.Init(wrk.conf); err != nil {
glog.Errorf("failed server init")
return
}
if err := s.Run(); err != nil {
glog.Errorf("failed start server: [%v]", err)
}
}()
}
}
// !to add a proper HTTP(S) server shutdown (cws/handler bad loop)
func (wrk *Worker) Shutdown() {
for _, s := range wrk.servers {
if err := s.Shutdown(wrk.ctx); err != nil {
glog.Errorln("failed server shutdown")
}
services.Add(
httpSrv,
NewHandler(conf, httpSrv.Addr),
)
if conf.Worker.Monitoring.IsEnabled() {
services.Add(monitoring.New(conf.Worker.Monitoring, httpSrv.GetHost(), "worker"))
}
return
}