DataHoarder
817a668aaf
All checks were successful
continuous-integration/drone/push Build is passing
314 lines
7.4 KiB
Go
314 lines
7.4 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"database/sql"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"github.com/lib/pq"
|
|
"github.com/minio/md5-simd"
|
|
"github.com/minio/sha256-simd"
|
|
"hash"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"runtime"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
type HashFileResult struct {
|
|
Error error
|
|
Path string
|
|
SHA256 string
|
|
MD5 string
|
|
Size uint64
|
|
}
|
|
|
|
func HashFile(results chan<- HashFileResult, md5hasher *md5simd.Hasher, sha256hasher *hash.Hash, path string) {
|
|
fi, err := os.Stat(path)
|
|
if err != nil {
|
|
results <- HashFileResult{
|
|
Error: err,
|
|
Path: path,
|
|
}
|
|
return
|
|
}
|
|
|
|
if fi.IsDir() {
|
|
results <- HashFileResult{
|
|
Error: fmt.Errorf("path %s is a directory", path),
|
|
Path: path,
|
|
}
|
|
return
|
|
}
|
|
|
|
fh, err := os.Open(path)
|
|
if err != nil {
|
|
results <- HashFileResult{
|
|
Error: err,
|
|
Path: path,
|
|
}
|
|
return
|
|
}
|
|
defer fh.Close()
|
|
|
|
io.Copy(io.MultiWriter(*sha256hasher, *md5hasher), fh)
|
|
|
|
results <- HashFileResult{
|
|
Error: nil,
|
|
Path: path,
|
|
SHA256: hex.EncodeToString((*sha256hasher).Sum(nil)),
|
|
MD5: hex.EncodeToString((*md5hasher).Sum(nil)),
|
|
Size: uint64(fi.Size()),
|
|
}
|
|
}
|
|
|
|
func PrintHashFileResult(result *HashFileResult, format string, settings PostgresSettings) {
|
|
switch format {
|
|
case "json":
|
|
var jsonData []byte
|
|
jsonData, err := json.Marshal(*result)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, result.Path, "Error: ", result.Error)
|
|
} else {
|
|
if result.Error != nil {
|
|
fmt.Fprintln(os.Stderr, string(jsonData))
|
|
} else {
|
|
fmt.Println(string(jsonData))
|
|
}
|
|
|
|
}
|
|
case "text":
|
|
if result.Error != nil {
|
|
fmt.Fprintln(os.Stderr, result.Path, "Error: ", result.Error)
|
|
} else {
|
|
fmt.Println(result.SHA256, result.MD5, result.Path)
|
|
}
|
|
case "postgres":
|
|
if result.Error != nil {
|
|
fmt.Fprintln(os.Stderr, result.Path, "Error: ", result.Error)
|
|
} else {
|
|
PostgresHashFileResult(result, settings)
|
|
}
|
|
}
|
|
}
|
|
|
|
func PostgresHashFileResult(result *HashFileResult, settings PostgresSettings) {
|
|
var err error
|
|
var rows *sql.Rows
|
|
|
|
hMd5, _ := hex.DecodeString(result.MD5)
|
|
hSha256, _ := hex.DecodeString(result.SHA256)
|
|
|
|
var stmt *sql.Stmt
|
|
|
|
switch settings.Mode {
|
|
case "insert_binary":
|
|
fallthrough
|
|
case "insert":
|
|
stmt = settings.InsertSTMT
|
|
case "update_binary":
|
|
fallthrough
|
|
case "update":
|
|
stmt = settings.UpdateSTMT
|
|
break
|
|
}
|
|
|
|
if stmt == nil {
|
|
fmt.Fprintln(os.Stderr, result.Path, "Invalid statement for mode: ", settings.Mode)
|
|
}
|
|
|
|
switch settings.Mode {
|
|
case "insert_binary":
|
|
fallthrough
|
|
case "update_binary":
|
|
if settings.HasSize {
|
|
rows, err = stmt.Query(result.Path, hMd5, hSha256, result.Size)
|
|
} else {
|
|
rows, err = stmt.Query(result.Path, hMd5, hSha256)
|
|
}
|
|
case "insert":
|
|
fallthrough
|
|
case "update":
|
|
if settings.HasSize {
|
|
rows, err = stmt.Query(result.Path, result.MD5, result.SHA256, result.Size)
|
|
} else {
|
|
rows, err = stmt.Query(result.Path, result.MD5, result.SHA256)
|
|
}
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, result.Path, "SQL Error: ", err)
|
|
} else {
|
|
fmt.Println(result.SHA256, result.MD5, result.Path)
|
|
}
|
|
if rows != nil {
|
|
rows.Close()
|
|
}
|
|
}
|
|
|
|
type PostgresSettings struct {
|
|
Mode string
|
|
HasSize bool
|
|
InsertSTMT *sql.Stmt
|
|
UpdateSTMT *sql.Stmt
|
|
}
|
|
|
|
func main() {
|
|
taskLimit := flag.Int("tasklimit", func() int {
|
|
result := runtime.NumCPU() * 16
|
|
if result > 128 {
|
|
return 128
|
|
}
|
|
return result
|
|
}(), "Maximum number of concurrent hashing tasks. Change to avoid fdlimit issues. Defaults to number of min(128, CPU cores * 16)")
|
|
|
|
outputFormat := flag.String("format", "text", "Output formats. Allowed: text, json, postgres")
|
|
pgConnStr := flag.String("pg_connstr", "", "Postgres connection string for postgres output format")
|
|
pgMode := flag.String("pg_mode", "insert", "Postgres output mode, insert or update, additionally _binary")
|
|
pgTable := flag.String("pg_table", "files", "Postgres output table")
|
|
pgPathRow := flag.String("pg_row_path", "path", "Postgres output row: path")
|
|
pgMd5Row := flag.String("pg_row_md5", "md5", "Postgres output row: md5")
|
|
pgSha256Row := flag.String("pg_row_sha256", "sha256", "Postgres output row: sha256")
|
|
pgSizeRow := flag.String("pg_row_size", "", "Postgres output row: size")
|
|
|
|
flag.Parse()
|
|
|
|
pgSettings := PostgresSettings{
|
|
Mode: *pgMode,
|
|
HasSize: len(*pgSizeRow) > 0,
|
|
}
|
|
|
|
if *outputFormat == "postgres" && *pgConnStr != "" {
|
|
handle, err := sql.Open("postgres", *pgConnStr)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer handle.Close()
|
|
|
|
if len(*pgSizeRow) > 0 {
|
|
pgSettings.InsertSTMT, err = handle.Prepare(fmt.Sprintf("INSERT INTO %s (%s, %s, %s, %s) VALUES ($1, $2, $3, $4);",
|
|
pq.QuoteIdentifier(*pgTable),
|
|
pq.QuoteIdentifier(*pgPathRow),
|
|
pq.QuoteIdentifier(*pgMd5Row),
|
|
pq.QuoteIdentifier(*pgSha256Row),
|
|
pq.QuoteIdentifier(*pgSizeRow),
|
|
))
|
|
} else {
|
|
pgSettings.InsertSTMT, err = handle.Prepare(fmt.Sprintf("INSERT INTO %s (%s, %s, %s) VALUES ($1, $2, $3);",
|
|
pq.QuoteIdentifier(*pgTable),
|
|
pq.QuoteIdentifier(*pgPathRow),
|
|
pq.QuoteIdentifier(*pgMd5Row),
|
|
pq.QuoteIdentifier(*pgSha256Row),
|
|
))
|
|
}
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer pgSettings.InsertSTMT.Close()
|
|
|
|
if len(*pgSizeRow) > 0 {
|
|
pgSettings.UpdateSTMT, err = handle.Prepare(fmt.Sprintf("UPDATE %s SET %s = $2, %s = $3, %s = $4 WHERE %s = $1;",
|
|
pq.QuoteIdentifier(*pgTable),
|
|
pq.QuoteIdentifier(*pgMd5Row),
|
|
pq.QuoteIdentifier(*pgSha256Row),
|
|
pq.QuoteIdentifier(*pgSizeRow),
|
|
pq.QuoteIdentifier(*pgPathRow),
|
|
))
|
|
} else {
|
|
pgSettings.UpdateSTMT, err = handle.Prepare(fmt.Sprintf("UPDATE %s SET %s = $2, %s = $3 WHERE %s = $1;",
|
|
pq.QuoteIdentifier(*pgTable),
|
|
pq.QuoteIdentifier(*pgMd5Row),
|
|
pq.QuoteIdentifier(*pgSha256Row),
|
|
pq.QuoteIdentifier(*pgPathRow),
|
|
))
|
|
}
|
|
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer pgSettings.UpdateSTMT.Close()
|
|
}
|
|
|
|
var taskCount int64
|
|
|
|
var entries int64
|
|
|
|
scanner := bufio.NewScanner(os.Stdin)
|
|
defer os.Stdin.Close()
|
|
|
|
var md5servers []md5simd.Server
|
|
md5hashers := make(chan md5simd.Hasher, *taskLimit)
|
|
sha256hashers := make(chan hash.Hash, *taskLimit)
|
|
|
|
for j := 0; j < *taskLimit; j++ {
|
|
serverIndex := j / 16
|
|
|
|
if (serverIndex + 1) > len(md5servers) {
|
|
md5servers = append(md5servers, md5simd.NewServer())
|
|
}
|
|
|
|
hasher := md5servers[serverIndex].NewHash()
|
|
md5hashers <- hasher
|
|
sha256hashers <- sha256.New()
|
|
}
|
|
|
|
resultChannel := make(chan HashFileResult)
|
|
|
|
atomic.AddInt64(&taskCount, 1)
|
|
go func() {
|
|
defer atomic.AddInt64(&taskCount, -1)
|
|
for scanner.Scan() {
|
|
path := scanner.Text()
|
|
atomic.AddInt64(&entries, 1)
|
|
|
|
for atomic.LoadInt64(&taskCount) >= (int64(*taskLimit) + 1) {
|
|
runtime.Gosched()
|
|
time.Sleep(time.Millisecond * 20)
|
|
}
|
|
|
|
atomic.AddInt64(&taskCount, 1)
|
|
go func() {
|
|
defer atomic.AddInt64(&taskCount, -1)
|
|
md5hasher := <-md5hashers
|
|
sha256hasher := <-sha256hashers
|
|
md5hasher.Reset()
|
|
sha256hasher.Reset()
|
|
HashFile(resultChannel, &md5hasher, &sha256hasher, path)
|
|
md5hashers <- md5hasher
|
|
sha256hashers <- sha256hasher
|
|
}()
|
|
}
|
|
|
|
}()
|
|
|
|
//Already print before finishing, use atomic ints instead of a WaitGroup
|
|
for atomic.LoadInt64(&taskCount) > 0 {
|
|
if atomic.LoadInt64(&entries) > 0 {
|
|
atomic.AddInt64(&entries, -1)
|
|
result := <-resultChannel
|
|
PrintHashFileResult(&result, *outputFormat, pgSettings)
|
|
}
|
|
}
|
|
|
|
close(resultChannel)
|
|
|
|
for result := range resultChannel {
|
|
PrintHashFileResult(&result, *outputFormat, pgSettings)
|
|
}
|
|
|
|
close(md5hashers)
|
|
close(sha256hashers)
|
|
|
|
for md5hasher := range md5hashers {
|
|
md5hasher.Close()
|
|
}
|
|
|
|
for _, md5server := range md5servers {
|
|
md5server.Close()
|
|
}
|
|
}
|