package main import ( "bufio" "database/sql" "encoding/hex" "encoding/json" "flag" "fmt" "github.com/lib/pq" "github.com/minio/md5-simd" "github.com/minio/sha256-simd" "hash" "io" "log" "os" "runtime" "sync/atomic" "time" ) type HashFileResult struct { Error error Path string SHA256 string MD5 string Size uint64 } func HashFile(results chan<- HashFileResult, md5hasher *md5simd.Hasher, sha256hasher *hash.Hash, path string) { fi, err := os.Stat(path) if err != nil { results <- HashFileResult{ Error: err, Path: path, } return } if fi.IsDir() { results <- HashFileResult{ Error: fmt.Errorf("path %s is a directory", path), Path: path, } return } fh, err := os.Open(path) if err != nil { results <- HashFileResult{ Error: err, Path: path, } return } defer fh.Close() io.Copy(io.MultiWriter(*sha256hasher, *md5hasher), fh) results <- HashFileResult{ Error: nil, Path: path, SHA256: hex.EncodeToString((*sha256hasher).Sum(nil)), MD5: hex.EncodeToString((*md5hasher).Sum(nil)), Size: uint64(fi.Size()), } } func PrintHashFileResult(result *HashFileResult, format string, settings PostgresSettings) { switch format { case "json": var jsonData []byte jsonData, err := json.Marshal(*result) if err != nil { fmt.Fprintln(os.Stderr, result.Path, "Error: ", result.Error) } else { if result.Error != nil { fmt.Fprintln(os.Stderr, string(jsonData)) } else { fmt.Println(string(jsonData)) } } case "text": if result.Error != nil { fmt.Fprintln(os.Stderr, result.Path, "Error: ", result.Error) } else { fmt.Println(result.SHA256, result.MD5, result.Path) } case "postgres": if result.Error != nil { fmt.Fprintln(os.Stderr, result.Path, "Error: ", result.Error) } else { PostgresHashFileResult(result, settings) } } } func PostgresHashFileResult(result *HashFileResult, settings PostgresSettings) { var err error var rows *sql.Rows hMd5, _ := hex.DecodeString(result.MD5) hSha256, _ := hex.DecodeString(result.SHA256) var stmt *sql.Stmt switch settings.Mode { case "insert_binary": fallthrough case "insert": stmt = settings.InsertSTMT case "update_binary": fallthrough case "update": stmt = settings.UpdateSTMT break } if stmt == nil { fmt.Fprintln(os.Stderr, result.Path, "Invalid statement for mode: ", settings.Mode) } switch settings.Mode { case "insert_binary": fallthrough case "update_binary": if settings.HasSize { rows, err = stmt.Query(result.Path, hMd5, hSha256, result.Size) } else { rows, err = stmt.Query(result.Path, hMd5, hSha256) } case "insert": fallthrough case "update": if settings.HasSize { rows, err = stmt.Query(result.Path, result.MD5, result.SHA256, result.Size) } else { rows, err = stmt.Query(result.Path, result.MD5, result.SHA256) } break } if err != nil { fmt.Fprintln(os.Stderr, result.Path, "SQL Error: ", err) } else { fmt.Println(result.SHA256, result.MD5, result.Path) } if rows != nil { rows.Close() } } type PostgresSettings struct { Mode string HasSize bool InsertSTMT *sql.Stmt UpdateSTMT *sql.Stmt } func main() { taskLimit := flag.Int("tasklimit", func() int { result := runtime.NumCPU() * 16 if result > 128 { return 128 } return result }(), "Maximum number of concurrent hashing tasks. Change to avoid fdlimit issues. Defaults to number of min(128, CPU cores * 16)") outputFormat := flag.String("format", "text", "Output formats. Allowed: text, json, postgres") pgConnStr := flag.String("pg_connstr", "", "Postgres connection string for postgres output format") pgMode := flag.String("pg_mode", "insert", "Postgres output mode, insert or update, additionally _binary") pgTable := flag.String("pg_table", "files", "Postgres output table") pgPathRow := flag.String("pg_row_path", "path", "Postgres output row: path") pgMd5Row := flag.String("pg_row_md5", "md5", "Postgres output row: md5") pgSha256Row := flag.String("pg_row_sha256", "sha256", "Postgres output row: sha256") pgSizeRow := flag.String("pg_row_size", "", "Postgres output row: size") flag.Parse() pgSettings := PostgresSettings{ Mode: *pgMode, HasSize: len(*pgSizeRow) > 0, } if *outputFormat == "postgres" && *pgConnStr != "" { handle, err := sql.Open("postgres", *pgConnStr) if err != nil { log.Fatal(err) } defer handle.Close() if len(*pgSizeRow) > 0 { pgSettings.InsertSTMT, err = handle.Prepare(fmt.Sprintf("INSERT INTO %s (%s, %s, %s, %s) VALUES ($1, $2, $3, $4);", pq.QuoteIdentifier(*pgTable), pq.QuoteIdentifier(*pgPathRow), pq.QuoteIdentifier(*pgMd5Row), pq.QuoteIdentifier(*pgSha256Row), pq.QuoteIdentifier(*pgSizeRow), )) } else { pgSettings.InsertSTMT, err = handle.Prepare(fmt.Sprintf("INSERT INTO %s (%s, %s, %s) VALUES ($1, $2, $3);", pq.QuoteIdentifier(*pgTable), pq.QuoteIdentifier(*pgPathRow), pq.QuoteIdentifier(*pgMd5Row), pq.QuoteIdentifier(*pgSha256Row), )) } if err != nil { log.Fatal(err) } defer pgSettings.InsertSTMT.Close() if len(*pgSizeRow) > 0 { pgSettings.UpdateSTMT, err = handle.Prepare(fmt.Sprintf("UPDATE %s SET %s = $2, %s = $3, %s = $4 WHERE %s = $1;", pq.QuoteIdentifier(*pgTable), pq.QuoteIdentifier(*pgMd5Row), pq.QuoteIdentifier(*pgSha256Row), pq.QuoteIdentifier(*pgSizeRow), pq.QuoteIdentifier(*pgPathRow), )) } else { pgSettings.UpdateSTMT, err = handle.Prepare(fmt.Sprintf("UPDATE %s SET %s = $2, %s = $3 WHERE %s = $1;", pq.QuoteIdentifier(*pgTable), pq.QuoteIdentifier(*pgMd5Row), pq.QuoteIdentifier(*pgSha256Row), pq.QuoteIdentifier(*pgPathRow), )) } if err != nil { log.Fatal(err) } defer pgSettings.UpdateSTMT.Close() } var taskCount int64 var entries int64 scanner := bufio.NewScanner(os.Stdin) defer os.Stdin.Close() var md5servers []md5simd.Server md5hashers := make(chan md5simd.Hasher, *taskLimit) sha256hashers := make(chan hash.Hash, *taskLimit) for j := 0; j < *taskLimit; j++ { serverIndex := j / 16 if (serverIndex + 1) > len(md5servers) { md5servers = append(md5servers, md5simd.NewServer()) } hasher := md5servers[serverIndex].NewHash() md5hashers <- hasher sha256hashers <- sha256.New() } resultChannel := make(chan HashFileResult) atomic.AddInt64(&taskCount, 1) go func() { defer atomic.AddInt64(&taskCount, -1) for scanner.Scan() { path := scanner.Text() atomic.AddInt64(&entries, 1) for atomic.LoadInt64(&taskCount) >= (int64(*taskLimit) + 1) { runtime.Gosched() time.Sleep(time.Millisecond * 20) } atomic.AddInt64(&taskCount, 1) go func() { defer atomic.AddInt64(&taskCount, -1) md5hasher := <-md5hashers sha256hasher := <-sha256hashers md5hasher.Reset() sha256hasher.Reset() HashFile(resultChannel, &md5hasher, &sha256hasher, path) md5hashers <- md5hasher sha256hashers <- sha256hasher }() } }() //Already print before finishing, use atomic ints instead of a WaitGroup for atomic.LoadInt64(&taskCount) > 0 { if atomic.LoadInt64(&entries) > 0 { atomic.AddInt64(&entries, -1) result := <-resultChannel PrintHashFileResult(&result, *outputFormat, pgSettings) } } close(resultChannel) for result := range resultChannel { PrintHashFileResult(&result, *outputFormat, pgSettings) } close(md5hashers) close(sha256hashers) for md5hasher := range md5hashers { md5hasher.Close() } for _, md5server := range md5servers { md5server.Close() } }