package splitstore

import (
	"bufio"
	"io"
	"os"
	"path/filepath"
	"sync"

	"github.com/ipfs/go-cid"
	"golang.org/x/xerrors"
)

type MapMarkSetEnv struct {
	path string
}

var _ MarkSetEnv = (*MapMarkSetEnv)(nil)

type MapMarkSet struct {
	mx  sync.RWMutex
	set map[string]struct{}

	persist bool
	file    *os.File
	buf     *bufio.Writer

	path string
}

var _ MarkSet = (*MapMarkSet)(nil)

func NewMapMarkSetEnv(path string) (*MapMarkSetEnv, error) {
	msPath := filepath.Join(path, "markset.map")
	err := os.MkdirAll(msPath, 0755) //nolint:gosec
	if err != nil {
		return nil, xerrors.Errorf("error creating markset directory: %w", err)
	}

	return &MapMarkSetEnv{path: msPath}, nil
}

func (e *MapMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
	path := filepath.Join(e.path, name)
	return &MapMarkSet{
		set:  make(map[string]struct{}, sizeHint),
		path: path,
	}, nil
}

func (e *MapMarkSetEnv) Recover(name string) (MarkSet, error) {
	path := filepath.Join(e.path, name)
	s := &MapMarkSet{
		set:  make(map[string]struct{}),
		path: path,
	}

	in, err := os.Open(path)
	if err != nil {
		return nil, xerrors.Errorf("error opening markset file for read: %w", err)
	}
	defer in.Close() //nolint:errcheck

	// wrap a buffered reader to make this faster
	buf := bufio.NewReader(in)
	for {
		var sz byte
		if sz, err = buf.ReadByte(); err != nil {
			break
		}

		key := make([]byte, int(sz))
		if _, err = io.ReadFull(buf, key); err != nil {
			break
		}

		s.set[string(key)] = struct{}{}
	}

	if err != io.EOF {
		return nil, xerrors.Errorf("error reading markset file: %w", err)
	}

	file, err := os.OpenFile(s.path, os.O_WRONLY|os.O_APPEND, 0)
	if err != nil {
		return nil, xerrors.Errorf("error opening markset file for write: %w", err)
	}

	s.persist = true
	s.file = file
	s.buf = bufio.NewWriter(file)

	return s, nil
}

func (e *MapMarkSetEnv) Close() error {
	return nil
}

func (s *MapMarkSet) BeginCriticalSection() error {
	s.mx.Lock()
	defer s.mx.Unlock()

	if s.set == nil {
		return errMarkSetClosed
	}

	if s.persist {
		return nil
	}

	file, err := os.OpenFile(s.path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
	if err != nil {
		return xerrors.Errorf("error opening markset file: %w", err)
	}

	// wrap a buffered writer to make this faster
	s.buf = bufio.NewWriter(file)
	for key := range s.set {
		if err := s.writeKey([]byte(key), false); err != nil {
			_ = file.Close()
			s.buf = nil
			return err
		}
	}
	if err := s.buf.Flush(); err != nil {
		_ = file.Close()
		s.buf = nil
		return xerrors.Errorf("error flushing markset file buffer: %w", err)
	}

	s.file = file
	s.persist = true

	return nil
}

func (s *MapMarkSet) EndCriticalSection() {
	s.mx.Lock()
	defer s.mx.Unlock()

	if !s.persist {
		return
	}

	_ = s.file.Close()
	_ = os.Remove(s.path)
	s.file = nil
	s.buf = nil
	s.persist = false
}

func (s *MapMarkSet) Mark(c cid.Cid) error {
	s.mx.Lock()
	defer s.mx.Unlock()

	if s.set == nil {
		return errMarkSetClosed
	}

	hash := c.Hash()
	s.set[string(hash)] = struct{}{}

	if s.persist {
		if err := s.writeKey(hash, true); err != nil {
			return err
		}

		if err := s.file.Sync(); err != nil {
			return xerrors.Errorf("error syncing markset: %w", err)
		}
	}

	return nil
}

func (s *MapMarkSet) MarkMany(batch []cid.Cid) error {
	s.mx.Lock()
	defer s.mx.Unlock()

	if s.set == nil {
		return errMarkSetClosed
	}

	for _, c := range batch {
		hash := c.Hash()
		s.set[string(hash)] = struct{}{}

		if s.persist {
			if err := s.writeKey(hash, false); err != nil {
				return err
			}
		}
	}

	if s.persist {
		if err := s.buf.Flush(); err != nil {
			return xerrors.Errorf("error flushing markset buffer to disk: %w", err)
		}

		if err := s.file.Sync(); err != nil {
			return xerrors.Errorf("error syncing markset: %w", err)
		}
	}

	return nil
}

func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
	s.mx.RLock()
	defer s.mx.RUnlock()

	if s.set == nil {
		return false, errMarkSetClosed
	}

	_, ok := s.set[string(cid.Hash())]
	return ok, nil
}

func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
	s.mx.Lock()
	defer s.mx.Unlock()

	if s.set == nil {
		return false, errMarkSetClosed
	}

	hash := c.Hash()
	key := string(hash)
	if _, ok := s.set[key]; ok {
		return false, nil
	}

	s.set[key] = struct{}{}

	if s.persist {
		if err := s.writeKey(hash, true); err != nil {
			return false, err
		}
		if err := s.file.Sync(); err != nil {
			return false, xerrors.Errorf("error syncing markset: %w", err)
		}
	}

	return true, nil
}

func (s *MapMarkSet) Close() error {
	s.mx.Lock()
	defer s.mx.Unlock()

	if s.set == nil {
		return nil
	}

	s.set = nil

	if s.file != nil {
		if err := s.file.Close(); err != nil {
			log.Warnf("error closing markset file: %s", err)
		}

		if !s.persist {
			if err := os.Remove(s.path); err != nil {
				log.Warnf("error removing markset file: %s", err)
			}
		}
	}

	return nil
}

func (s *MapMarkSet) writeKey(k []byte, flush bool) error {
	if err := s.buf.WriteByte(byte(len(k))); err != nil {
		return xerrors.Errorf("error writing markset key length to disk: %w", err)
	}
	if _, err := s.buf.Write(k); err != nil {
		return xerrors.Errorf("error writing markset key to disk: %w", err)
	}
	if flush {
		if err := s.buf.Flush(); err != nil {
			return xerrors.Errorf("error flushing markset buffer to disk: %w", err)
		}
	}

	return nil
}