* Add a storage layer for attachments * Fix some bug * fix test * Fix copyright head and lint * Fix bug * Add setting for minio and flags for migrate-storage * Add documents * fix lint * Add test for minio store type on attachments * fix test * fix test * Apply suggestions from code review Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> * Add warning when storage migrated successfully * Fix drone * fix test * rebase * Fix test * display the error on console * Move minio test to amd64 since minio docker don't support arm64 * refactor the codes * add trace * Fix test * remove log on xorm * Fi download bug * Add a storage layer for attachments * Add setting for minio and flags for migrate-storage * fix lint * Add test for minio store type on attachments * Apply suggestions from code review Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> * Fix drone * fix test * Fix test * display the error on console * Move minio test to amd64 since minio docker don't support arm64 * refactor the codes * add trace * Fix test * Add URL function to serve attachments directly from S3/Minio * Add ability to enable/disable redirection in attachment configuration * Fix typo * Add a storage layer for attachments * Add setting for minio and flags for migrate-storage * fix lint * Add test for minio store type on attachments * Apply suggestions from code review Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> * Fix drone * fix test * Fix test * display the error on console * Move minio test to amd64 since minio docker don't support arm64 * don't change unrelated files * Fix lint * Fix build * update go.mod and go.sum * Use github.com/minio/minio-go/v6 * Remove unused function * Upgrade minio to v7 and some other improvements * fix lint * Fix go mod Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Tyler <tystuyfzand@gmail.com>
		
			
				
	
	
		
			120 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			120 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
package concurrent
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"runtime"
 | 
						|
	"runtime/debug"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
	"reflect"
 | 
						|
)
 | 
						|
 | 
						|
// HandlePanic logs goroutine panic by default
 | 
						|
var HandlePanic = func(recovered interface{}, funcName string) {
 | 
						|
	ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
 | 
						|
	ErrorLogger.Println(string(debug.Stack()))
 | 
						|
}
 | 
						|
 | 
						|
// UnboundedExecutor is a executor without limits on counts of alive goroutines
 | 
						|
// it tracks the goroutine started by it, and can cancel them when shutdown
 | 
						|
type UnboundedExecutor struct {
 | 
						|
	ctx                   context.Context
 | 
						|
	cancel                context.CancelFunc
 | 
						|
	activeGoroutinesMutex *sync.Mutex
 | 
						|
	activeGoroutines      map[string]int
 | 
						|
	HandlePanic           func(recovered interface{}, funcName string)
 | 
						|
}
 | 
						|
 | 
						|
// GlobalUnboundedExecutor has the life cycle of the program itself
 | 
						|
// any goroutine want to be shutdown before main exit can be started from this executor
 | 
						|
// GlobalUnboundedExecutor expects the main function to call stop
 | 
						|
// it does not magically knows the main function exits
 | 
						|
var GlobalUnboundedExecutor = NewUnboundedExecutor()
 | 
						|
 | 
						|
// NewUnboundedExecutor creates a new UnboundedExecutor,
 | 
						|
// UnboundedExecutor can not be created by &UnboundedExecutor{}
 | 
						|
// HandlePanic can be set with a callback to override global HandlePanic
 | 
						|
func NewUnboundedExecutor() *UnboundedExecutor {
 | 
						|
	ctx, cancel := context.WithCancel(context.TODO())
 | 
						|
	return &UnboundedExecutor{
 | 
						|
		ctx:                   ctx,
 | 
						|
		cancel:                cancel,
 | 
						|
		activeGoroutinesMutex: &sync.Mutex{},
 | 
						|
		activeGoroutines:      map[string]int{},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Go starts a new goroutine and tracks its lifecycle.
 | 
						|
// Panic will be recovered and logged automatically, except for StopSignal
 | 
						|
func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
 | 
						|
	pc := reflect.ValueOf(handler).Pointer()
 | 
						|
	f := runtime.FuncForPC(pc)
 | 
						|
	funcName := f.Name()
 | 
						|
	file, line := f.FileLine(pc)
 | 
						|
	executor.activeGoroutinesMutex.Lock()
 | 
						|
	defer executor.activeGoroutinesMutex.Unlock()
 | 
						|
	startFrom := fmt.Sprintf("%s:%d", file, line)
 | 
						|
	executor.activeGoroutines[startFrom] += 1
 | 
						|
	go func() {
 | 
						|
		defer func() {
 | 
						|
			recovered := recover()
 | 
						|
			// if you want to quit a goroutine without trigger HandlePanic
 | 
						|
			// use runtime.Goexit() to quit
 | 
						|
			if recovered != nil {
 | 
						|
				if executor.HandlePanic == nil {
 | 
						|
					HandlePanic(recovered, funcName)
 | 
						|
				} else {
 | 
						|
					executor.HandlePanic(recovered, funcName)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			executor.activeGoroutinesMutex.Lock()
 | 
						|
			executor.activeGoroutines[startFrom] -= 1
 | 
						|
			executor.activeGoroutinesMutex.Unlock()
 | 
						|
		}()
 | 
						|
		handler(executor.ctx)
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
// Stop cancel all goroutines started by this executor without wait
 | 
						|
func (executor *UnboundedExecutor) Stop() {
 | 
						|
	executor.cancel()
 | 
						|
}
 | 
						|
 | 
						|
// StopAndWaitForever cancel all goroutines started by this executor and
 | 
						|
// wait until all goroutines exited
 | 
						|
func (executor *UnboundedExecutor) StopAndWaitForever() {
 | 
						|
	executor.StopAndWait(context.Background())
 | 
						|
}
 | 
						|
 | 
						|
// StopAndWait cancel all goroutines started by this executor and wait.
 | 
						|
// Wait can be cancelled by the context passed in.
 | 
						|
func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
 | 
						|
	executor.cancel()
 | 
						|
	for {
 | 
						|
		oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
 | 
						|
		select {
 | 
						|
		case <-oneHundredMilliseconds.C:
 | 
						|
			if executor.checkNoActiveGoroutines() {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		case <-ctx.Done():
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
 | 
						|
	executor.activeGoroutinesMutex.Lock()
 | 
						|
	defer executor.activeGoroutinesMutex.Unlock()
 | 
						|
	for startFrom, count := range executor.activeGoroutines {
 | 
						|
		if count > 0 {
 | 
						|
			InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
 | 
						|
				"startFrom", startFrom,
 | 
						|
				"count", count)
 | 
						|
			return false
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return true
 | 
						|
}
 |