package protocol
import (
"encoding/binary"
"errors"
"fmt"
"io"
"time"
flatbuffers "github.com/google/flatbuffers/go"
"shep/internal/fbs/shep"
)
const (
// ProtocolVersion is the current protocol version.
ProtocolVersion = 1
// MaxMessageSize is the maximum allowed message size (1MB).
MaxMessageSize = 1 << 20
)
// MessageKind identifies the type of framed message (lifecycle vs capability).
type MessageKind byte
const (
KindHandshake MessageKind = 1
KindHandshakeResponse MessageKind = 2
KindEnvelope MessageKind = 3
KindLog MessageKind = 4
KindShutdown MessageKind = 5
)
var (
ErrMessageTooLarge = errors.New("message too large")
ErrInvalidMessage = errors.New("invalid message")
ErrUnknownKind = errors.New("unknown message kind")
)
// CapabilityInfo describes a capability declared by a plugin.
type CapabilityInfo struct {
Name string
Version int
}
// WriteMessage writes a framed message with kind prefix.
func WriteMessage(w io.Writer, kind MessageKind, data []byte) error {
if len(data) > MaxMessageSize {
return ErrMessageTooLarge
}
// Write kind (1 byte) + length (4 bytes big endian) + data
header := make([]byte, 5)
header[0] = byte(kind)
binary.BigEndian.PutUint32(header[1:], uint32(len(data)))
if _, err := w.Write(header); err != nil {
return fmt.Errorf("write header: %w", err)
}
if _, err := w.Write(data); err != nil {
return fmt.Errorf("write data: %w", err)
}
return nil
}
// ReadMessage reads a framed message and returns the kind and data.
func ReadMessage(r io.Reader) (MessageKind, []byte, error) {
// Read kind + length
header := make([]byte, 5)
if _, err := io.ReadFull(r, header); err != nil {
return 0, nil, fmt.Errorf("read header: %w", err)
}
kind := MessageKind(header[0])
msgLen := binary.BigEndian.Uint32(header[1:])
if msgLen > MaxMessageSize {
return 0, nil, ErrMessageTooLarge
}
data := make([]byte, msgLen)
if _, err := io.ReadFull(r, data); err != nil {
return 0, nil, fmt.Errorf("read data: %w", err)
}
return kind, data, nil
}
// BuildHandshake creates a handshake message.
func BuildHandshake(name string, capabilities []CapabilityInfo) []byte {
builder := flatbuffers.NewBuilder(256)
// Build capabilities
capOffsets := make([]flatbuffers.UOffsetT, len(capabilities))
for i, cap := range capabilities {
nameOffset := builder.CreateString(cap.Name)
shep.CapabilityStart(builder)
shep.CapabilityAddName(builder, nameOffset)
shep.CapabilityAddVersion(builder, int32(cap.Version))
capOffsets[i] = shep.CapabilityEnd(builder)
}
shep.HandshakeStartCapabilitiesVector(builder, len(capabilities))
for i := len(capOffsets) - 1; i >= 0; i-- {
builder.PrependUOffsetT(capOffsets[i])
}
capsVec := builder.EndVector(len(capabilities))
// Build handshake
nameOffset := builder.CreateString(name)
shep.HandshakeStart(builder)
shep.HandshakeAddProtocolVersion(builder, ProtocolVersion)
shep.HandshakeAddPluginName(builder, nameOffset)
shep.HandshakeAddCapabilities(builder, capsVec)
offset := shep.HandshakeEnd(builder)
builder.Finish(offset)
return builder.FinishedBytes()
}
// ParseHandshake parses a handshake message.
func ParseHandshake(data []byte) (*shep.Handshake, error) {
if len(data) == 0 {
return nil, ErrInvalidMessage
}
return shep.GetRootAsHandshake(data, 0), nil
}
// BuildHandshakeResponse creates a handshake response message.
func BuildHandshakeResponse(status shep.Status, errMsg string) []byte {
builder := flatbuffers.NewBuilder(256)
var errOffset flatbuffers.UOffsetT
if errMsg != "" {
errOffset = builder.CreateString(errMsg)
}
shep.HandshakeResponseStart(builder)
shep.HandshakeResponseAddProtocolVersion(builder, ProtocolVersion)
shep.HandshakeResponseAddStatus(builder, status)
if errMsg != "" {
shep.HandshakeResponseAddErrorMessage(builder, errOffset)
}
offset := shep.HandshakeResponseEnd(builder)
builder.Finish(offset)
return builder.FinishedBytes()
}
// ParseHandshakeResponse parses a handshake response message.
func ParseHandshakeResponse(data []byte) (*shep.HandshakeResponse, error) {
if len(data) == 0 {
return nil, ErrInvalidMessage
}
return shep.GetRootAsHandshakeResponse(data, 0), nil
}
// BuildEnvelope creates a capability envelope message.
func BuildEnvelope(capability, correlationID string, isResponse bool, errMsg string, payload []byte) []byte {
builder := flatbuffers.NewBuilder(256 + len(payload))
capOffset := builder.CreateString(capability)
corrOffset := builder.CreateString(correlationID)
var errOffset flatbuffers.UOffsetT
if errMsg != "" {
errOffset = builder.CreateString(errMsg)
}
var payloadOffset flatbuffers.UOffsetT
if len(payload) > 0 {
payloadOffset = builder.CreateByteVector(payload)
}
shep.EnvelopeStart(builder)
shep.EnvelopeAddProtocolVersion(builder, ProtocolVersion)
shep.EnvelopeAddCapability(builder, capOffset)
shep.EnvelopeAddCorrelationId(builder, corrOffset)
shep.EnvelopeAddIsResponse(builder, isResponse)
if errMsg != "" {
shep.EnvelopeAddError(builder, errOffset)
}
if len(payload) > 0 {
shep.EnvelopeAddPayload(builder, payloadOffset)
}
offset := shep.EnvelopeEnd(builder)
builder.Finish(offset)
return builder.FinishedBytes()
}
// ParseEnvelope parses an envelope message.
func ParseEnvelope(data []byte) (*shep.Envelope, error) {
if len(data) == 0 {
return nil, ErrInvalidMessage
}
return shep.GetRootAsEnvelope(data, 0), nil
}
// BuildLog creates a log message.
func BuildLog(level shep.LogLevel, message string) []byte {
builder := flatbuffers.NewBuilder(256)
msgOffset := builder.CreateString(message)
shep.LogMessageStart(builder)
shep.LogMessageAddLevel(builder, level)
shep.LogMessageAddMessage(builder, msgOffset)
shep.LogMessageAddTimestamp(builder, time.Now().UnixMilli())
offset := shep.LogMessageEnd(builder)
builder.Finish(offset)
return builder.FinishedBytes()
}
// ParseLog parses a log message.
func ParseLog(data []byte) (*shep.LogMessage, error) {
if len(data) == 0 {
return nil, ErrInvalidMessage
}
return shep.GetRootAsLogMessage(data, 0), nil
}
// BuildShutdown creates a shutdown message.
func BuildShutdown(reason string) []byte {
builder := flatbuffers.NewBuilder(128)
reasonOffset := builder.CreateString(reason)
shep.ShutdownStart(builder)
shep.ShutdownAddReason(builder, reasonOffset)
offset := shep.ShutdownEnd(builder)
builder.Finish(offset)
return builder.FinishedBytes()
}
// ParseShutdown parses a shutdown message.
func ParseShutdown(data []byte) (*shep.Shutdown, error) {
if len(data) == 0 {
return nil, ErrInvalidMessage
}
return shep.GetRootAsShutdown(data, 0), nil
}