From f63552fa09575d46dcc45c7c7454edfc884b43f8 Mon Sep 17 00:00:00 2001 From: milarin Date: Thu, 19 Jan 2023 12:05:46 +0100 Subject: [PATCH] graceful shutdown on SIGTERM or SIGINT --- main.go | 33 +++++++++++++++++++++++++++++++-- telegram.go | 11 +++++++++++ watch_directory.go | 12 +++++++++++- 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index f6bf768..6fc2530 100644 --- a/main.go +++ b/main.go @@ -1,16 +1,35 @@ package main import ( + "context" + "os" "os/exec" + "os/signal" + "sync" + "syscall" "git.milar.in/milarin/channel" "git.milar.in/nyaanime/logic" "github.com/fsnotify/fsnotify" ) -var Runner = InitializeRunner() +var ( + // AppCtx notifies all threads to finish their work and shutdown + AppCtx, cancelAppCtx = context.WithCancel(context.Background()) + // AppExitWg is waiting for all threads until they are done + AppExitWg = &sync.WaitGroup{} + + Runner = InitializeRunner() +) func main() { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT) + go func() { + <-signalChan + exit() + }() + // check for ffprobe in PATH if _, err := exec.LookPath("ffprobe"); err != nil { panic(err) // TODO error handling @@ -35,10 +54,20 @@ func main() { fileHandleChan := channel.Map(fileChan, NewFileHandle) workChan, logChan := channel.Tee(fileHandleChan) - go channel.Each(workChan, HandleFileInRunner) + AppExitWg.Add(1) + go func() { + defer AppExitWg.Done() + channel.Each(workChan, HandleFileInRunner) + }() + channel.Each(logChan, PrintFileHandle) } func HandleFileInRunner(fh *FileHandle) { Runner.Run(func() { HandleFile(fh) }) } + +func exit() { + cancelAppCtx() // notify all threads to shutdown + AppExitWg.Wait() // wait for threads until shutdown +} diff --git a/telegram.go b/telegram.go index e77f7c2..bb43ac8 100644 --- a/telegram.go +++ b/telegram.go @@ -24,11 +24,22 @@ func InitTelegramBot() error { } TelegramBot = bot } + + // close channel on app shutdown + go func() { + <-AppCtx.Done() + close(AnimeEpisodeMessageChannel) + }() + + AppExitWg.Add(1) go SendMessagePeriodically() + return nil } func SendMessagePeriodically() { + defer AppExitWg.Done() + var messagesPerInterval <-chan []model.AnimeEpisode if TelegramOrganizeMessageSendInterval > 0 { diff --git a/watch_directory.go b/watch_directory.go index 10372cb..671b9d7 100644 --- a/watch_directory.go +++ b/watch_directory.go @@ -26,7 +26,15 @@ func WatchDirectory(op fsnotify.Op, path string) (<-chan string, error) { ch := make(chan string, 10) + // close channel on app shutdown + go func() { + <-AppCtx.Done() + watcher.Close() + }() + + AppExitWg.Add(1) go func(watcher *fsnotify.Watcher, ch chan<- string) { + defer AppExitWg.Done() defer watcher.Close() for _, file := range files { @@ -74,7 +82,9 @@ func WatchDirectory(op fsnotify.Op, path string) (<-chan string, error) { ch <- event.Name } case err, ok := <-watcher.Errors: - fmt.Println(err, ok) + if ok { + fmt.Println(err, ok) + } close(ch) return }