soft-serve

git clone git://git.lin.moe/fork/soft-serve.git

  1package lfs
  2
  3import (
  4	"bufio"
  5	"bytes"
  6	"context"
  7	"encoding/hex"
  8	"fmt"
  9	"hash"
 10	"io"
 11	"strconv"
 12	"strings"
 13	"sync"
 14
 15	gitm "github.com/aymanbagabas/git-module"
 16	"github.com/charmbracelet/soft-serve/git"
 17)
 18
 19// SearchPointerBlobs scans the whole repository for LFS pointer files
 20func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan chan<- PointerBlob, errChan chan<- error) {
 21	basePath := repo.Path
 22
 23	catFileCheckReader, catFileCheckWriter := io.Pipe()
 24	shasToBatchReader, shasToBatchWriter := io.Pipe()
 25	catFileBatchReader, catFileBatchWriter := io.Pipe()
 26
 27	wg := sync.WaitGroup{}
 28	wg.Add(6)
 29
 30	// Create the go-routines in reverse order.
 31
 32	// 4. Take the output of cat-file --batch and check if each file in turn
 33	// to see if they're pointers to files in the LFS store
 34	go createPointerResultsFromCatFileBatch(ctx, catFileBatchReader, &wg, pointerChan)
 35
 36	// 3. Take the shas of the blobs and batch read them
 37	go catFileBatch(ctx, shasToBatchReader, catFileBatchWriter, &wg, basePath)
 38
 39	// 2. From the provided objects restrict to blobs <=1k
 40	go blobsLessThan1024FromCatFileBatchCheck(catFileCheckReader, shasToBatchWriter, &wg)
 41
 42	// 1. Run batch-check on all objects in the repository
 43	revListReader, revListWriter := io.Pipe()
 44	shasToCheckReader, shasToCheckWriter := io.Pipe()
 45	go catFileBatchCheck(ctx, shasToCheckReader, catFileCheckWriter, &wg, basePath)
 46	go blobsFromRevListObjects(revListReader, shasToCheckWriter, &wg)
 47	go revListAllObjects(ctx, revListWriter, &wg, basePath, errChan)
 48	wg.Wait()
 49
 50	close(pointerChan)
 51	close(errChan)
 52}
 53
 54func CheckPointerExist(ctx context.Context, repo *git.Repository, pointerChan <-chan PointerBlob) (<-chan bool, func()) {
 55	ctx, cancel := context.WithCancel(ctx)
 56	resultChan := make(chan bool)
 57	wg := sync.WaitGroup{}
 58	wg.Add(3)
 59
 60	stop := func() {
 61		cancel()
 62		wg.Wait()
 63	}
 64
 65	shasToCheckReader, shasToCheckWriter := io.Pipe()
 66	catFileCheckReader, catFileCheckWriter := io.Pipe()
 67
 68	go catFileBatchLineExist(ctx, catFileCheckReader, resultChan, &wg)
 69
 70	go catFileBatchCheck(ctx, shasToCheckReader, catFileCheckWriter, &wg, repo.Path)
 71
 72	go pointerBlobHash(ctx, repo.Hasher(), pointerChan, shasToCheckWriter, &wg)
 73
 74	return resultChan, stop
 75}
 76
 77func createPointerResultsFromCatFileBatch(ctx context.Context, catFileBatchReader *io.PipeReader, wg *sync.WaitGroup, pointerChan chan<- PointerBlob) {
 78	defer wg.Done()
 79	defer catFileBatchReader.Close() //nolint: errcheck
 80
 81	bufferedReader := bufio.NewReader(catFileBatchReader)
 82	buf := make([]byte, 1025)
 83
 84loop:
 85	for {
 86		select {
 87		case <-ctx.Done():
 88			break loop
 89		default:
 90		}
 91
 92		// File descriptor line: sha
 93		sha, err := bufferedReader.ReadString(' ')
 94		if err != nil {
 95			_ = catFileBatchReader.CloseWithError(err)
 96			break
 97		}
 98		sha = strings.TrimSpace(sha)
 99		// Throw away the blob
100		if _, err := bufferedReader.ReadString(' '); err != nil {
101			_ = catFileBatchReader.CloseWithError(err)
102			break
103		}
104		sizeStr, err := bufferedReader.ReadString('\n')
105		if err != nil {
106			_ = catFileBatchReader.CloseWithError(err)
107			break
108		}
109		size, err := strconv.Atoi(sizeStr[:len(sizeStr)-1])
110		if err != nil {
111			_ = catFileBatchReader.CloseWithError(err)
112			break
113		}
114		pointerBuf := buf[:size+1]
115		if _, err := io.ReadFull(bufferedReader, pointerBuf); err != nil {
116			_ = catFileBatchReader.CloseWithError(err)
117			break
118		}
119		pointerBuf = pointerBuf[:size]
120		// Now we need to check if the pointerBuf is an LFS pointer
121		pointer, _ := ReadPointerFromBuffer(pointerBuf)
122		if !pointer.IsValid() {
123			continue
124		}
125
126		pointerChan <- PointerBlob{Hash: sha, Pointer: pointer}
127	}
128}
129
130func catFileBatch(ctx context.Context, shasToBatchReader *io.PipeReader, catFileBatchWriter *io.PipeWriter, wg *sync.WaitGroup, basePath string) {
131	defer wg.Done()
132	defer shasToBatchReader.Close()  //nolint: errcheck
133	defer catFileBatchWriter.Close() //nolint: errcheck
134
135	stderr := new(bytes.Buffer)
136	var errbuf strings.Builder
137	if err := gitm.NewCommandWithContext(ctx, "cat-file", "--batch").
138		WithTimeout(-1).
139		RunInDirWithOptions(basePath, gitm.RunInDirOptions{
140			Stdout: catFileBatchWriter,
141			Stdin:  shasToBatchReader,
142			Stderr: stderr,
143		}); err != nil {
144		_ = shasToBatchReader.CloseWithError(fmt.Errorf("git rev-list [%s]: %w - %s", basePath, err, errbuf.String()))
145	}
146}
147
148func blobsLessThan1024FromCatFileBatchCheck(catFileCheckReader *io.PipeReader, shasToBatchWriter *io.PipeWriter, wg *sync.WaitGroup) {
149	defer wg.Done()
150	defer catFileCheckReader.Close() //nolint: errcheck
151	scanner := bufio.NewScanner(catFileCheckReader)
152	defer func() {
153		_ = shasToBatchWriter.CloseWithError(scanner.Err())
154	}()
155	for scanner.Scan() {
156		line := scanner.Text()
157		if len(line) == 0 {
158			continue
159		}
160		fields := strings.Split(line, " ")
161		if len(fields) < 3 || fields[1] != "blob" {
162			continue
163		}
164		size, _ := strconv.Atoi(fields[2])
165		if size > 1024 {
166			continue
167		}
168		toWrite := []byte(fields[0] + "\n")
169		for len(toWrite) > 0 {
170			n, err := shasToBatchWriter.Write(toWrite)
171			if err != nil {
172				_ = catFileCheckReader.CloseWithError(err)
173				break
174			}
175			toWrite = toWrite[n:]
176		}
177	}
178}
179
180func catFileBatchCheck(ctx context.Context, shasToCheckReader *io.PipeReader, catFileCheckWriter *io.PipeWriter, wg *sync.WaitGroup, basePath string) {
181	defer wg.Done()
182	defer shasToCheckReader.Close()  //nolint: errcheck
183	defer catFileCheckWriter.Close() //nolint: errcheck
184
185	stderr := new(bytes.Buffer)
186	var errbuf strings.Builder
187	if err := gitm.NewCommandWithContext(ctx, "cat-file", "--batch-check").
188		WithTimeout(-1).
189		RunInDirWithOptions(basePath, gitm.RunInDirOptions{
190			Stdout: catFileCheckWriter,
191			Stdin:  shasToCheckReader,
192			Stderr: stderr,
193		}); err != nil {
194		_ = shasToCheckReader.CloseWithError(fmt.Errorf("git rev-list [%s]: %w - %s", basePath, err, errbuf.String()))
195	}
196}
197
198func blobsFromRevListObjects(revListReader *io.PipeReader, shasToCheckWriter *io.PipeWriter, wg *sync.WaitGroup) {
199	defer wg.Done()
200	defer revListReader.Close() //nolint: errcheck
201	scanner := bufio.NewScanner(revListReader)
202	defer func() {
203		_ = shasToCheckWriter.CloseWithError(scanner.Err())
204	}()
205
206	for scanner.Scan() {
207		line := scanner.Text()
208		if len(line) == 0 {
209			continue
210		}
211		fields := strings.Split(line, " ")
212		if len(fields) < 2 || len(fields[1]) == 0 {
213			continue
214		}
215		toWrite := []byte(fields[0] + "\n")
216		for len(toWrite) > 0 {
217			n, err := shasToCheckWriter.Write(toWrite)
218			if err != nil {
219				_ = revListReader.CloseWithError(err)
220				break
221			}
222			toWrite = toWrite[n:]
223		}
224	}
225}
226
227func revListAllObjects(ctx context.Context, revListWriter *io.PipeWriter, wg *sync.WaitGroup, basePath string, errChan chan<- error) {
228	defer wg.Done()
229	defer revListWriter.Close() //nolint: errcheck
230
231	stderr := new(bytes.Buffer)
232	var errbuf strings.Builder
233	if err := gitm.NewCommandWithContext(ctx, "rev-list", "--objects", "--all").
234		WithTimeout(-1).
235		RunInDirWithOptions(basePath, gitm.RunInDirOptions{
236			Stdout: revListWriter,
237			Stderr: stderr,
238		}); err != nil {
239		errChan <- fmt.Errorf("git rev-list [%s]: %w - %s", basePath, err, errbuf.String())
240	}
241}
242
243func pointerBlobHash(ctx context.Context, hasher func() hash.Hash, pointerChan <-chan PointerBlob, shastoCheckWriter *io.PipeWriter, wg *sync.WaitGroup) {
244	defer wg.Done()
245
246	for pointer := range pointerChan {
247		pointerSha := hex.EncodeToString(hasher().Sum([]byte(pointer.String())))
248		shastoCheckWriter.Write([]byte(pointerSha + "\n"))
249	}
250}
251
252// <sha> SP <type> SP <size> LF
253func catFileBatchLineExist(ctx context.Context, catFileCheckReader *io.PipeReader, resultChan chan<- bool, wg *sync.WaitGroup) {
254	defer wg.Done()
255	defer catFileCheckReader.Close()
256	scanner := bufio.NewScanner(catFileCheckReader)
257	defer func() {
258		close(resultChan)
259	}()
260	for scanner.Scan() {
261		typ := scanner.Text()
262		if len(typ) == 0 {
263			continue
264		}
265		idx := strings.IndexByte(typ, ' ')
266		if idx < 0 {
267			resultChan <- false
268			continue
269		}
270		typ = typ[idx+1:] // remove <sha>
271
272		idx = strings.IndexByte(typ, ' ')
273		if idx < 0 {
274			resultChan <- false
275		}
276
277		sizeStr := typ[idx+1 : len(typ)-1] // remote <type>
278		typ = typ[:idx]
279
280		_, err := strconv.ParseInt(sizeStr, 10, 64) // # validate size
281		resultChan <- (err == nil)
282	}
283}