package memcached import ( "encoding/binary" "fmt" "github.com/couchbase/gomemcached" "math" ) type SystemEventType int const InvalidSysEvent SystemEventType = -1 const ( CollectionCreate SystemEventType = 0 CollectionDrop SystemEventType = iota CollectionFlush SystemEventType = iota // KV did not implement ScopeCreate SystemEventType = iota ScopeDrop SystemEventType = iota CollectionChanged SystemEventType = iota ) type ScopeCreateEvent interface { GetSystemEventName() (string, error) GetScopeId() (uint32, error) GetManifestId() (uint64, error) } type CollectionCreateEvent interface { GetSystemEventName() (string, error) GetScopeId() (uint32, error) GetCollectionId() (uint32, error) GetManifestId() (uint64, error) GetMaxTTL() (uint32, error) } type CollectionDropEvent interface { GetScopeId() (uint32, error) GetCollectionId() (uint32, error) GetManifestId() (uint64, error) } type ScopeDropEvent interface { GetScopeId() (uint32, error) GetManifestId() (uint64, error) } type CollectionChangedEvent interface { GetCollectionId() (uint32, error) GetManifestId() (uint64, error) GetMaxTTL() (uint32, error) } var ErrorInvalidOp error = fmt.Errorf("Invalid Operation") var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing") var ErrorValueTooShort error = fmt.Errorf("Value length is too short") var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL") // UprEvent memcached events for UPR streams. type UprEvent struct { Opcode gomemcached.CommandCode // Type of event Status gomemcached.Status // Response status VBucket uint16 // VBucket this event applies to DataType uint8 // data type Opaque uint16 // 16 MSB of opaque VBuuid uint64 // This field is set by downstream Flags uint32 // Item flags Expiry uint32 // Item expiration time Key, Value []byte // Item key/value OldValue []byte // TODO: TBD: old document value Cas uint64 // CAS value of the item Seqno uint64 // sequence number of the mutation RevSeqno uint64 // rev sequence number : deletions LockTime uint32 // Lock time MetadataSize uint16 // Metadata size SnapstartSeq uint64 // start sequence number of this snapshot SnapendSeq uint64 // End sequence number of the snapshot SnapshotType uint32 // 0: disk 1: memory FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number Error error // Error value in case of a failure ExtMeta []byte // Extended Metadata AckSize uint32 // The number of bytes that can be Acked to DCP SystemEvent SystemEventType // Only valid if IsSystemEvent() is true SysEventVersion uint8 // Based on the version, the way Extra bytes is parsed is different ValueLen int // Cache it to avoid len() calls for performance CollectionId uint64 // Valid if Collection is in use } // FailoverLog containing vvuid and sequnce number type FailoverLog [][2]uint64 func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent { event := &UprEvent{ Opcode: rq.Opcode, VBucket: stream.Vbucket, VBuuid: stream.Vbuuid, Value: rq.Body, Cas: rq.Cas, ExtMeta: rq.ExtMeta, DataType: rq.DataType, ValueLen: len(rq.Body), SystemEvent: InvalidSysEvent, CollectionId: math.MaxUint64, } event.PopulateFieldsBasedOnStreamType(rq, stream.StreamType) // set AckSize for events that need to be acked to DCP, // i.e., events with CommandCodes that need to be buffered in DCP if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok { event.AckSize = uint32(bytesReceivedFromDCP) } // 16 LSBits are used by client library to encode vbucket number. // 16 MSBits are left for application to multiplex on opaque value. event.Opaque = appOpaque(rq.Opaque) if len(rq.Extras) >= uprMutationExtraLen && event.Opcode == gomemcached.UPR_MUTATION { event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20]) event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24]) event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28]) event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30]) } else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen && event.Opcode == gomemcached.UPR_DELETION { event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20]) } else if len(rq.Extras) >= uprDeletetionExtraLen && event.Opcode == gomemcached.UPR_DELETION || event.Opcode == gomemcached.UPR_EXPIRATION { event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8]) event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16]) event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18]) } else if len(rq.Extras) >= uprSnapshotExtraLen && event.Opcode == gomemcached.UPR_SNAPSHOT { event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8]) event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16]) event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20]) } else if event.IsSystemEvent() { event.PopulateEvent(rq.Extras) } return event } func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType) { switch streamType { case CollectionsNonStreamId: switch rq.Opcode { // Only these will have CID encoded within the key case gomemcached.UPR_MUTATION, gomemcached.UPR_DELETION, gomemcached.UPR_EXPIRATION: uleb128 := Uleb128(rq.Key) result, bytesShifted := uleb128.ToUint64(rq.Keylen) event.CollectionId = result event.Key = rq.Key[bytesShifted:] default: event.Key = rq.Key } case CollectionsStreamId: // TODO - not implemented fallthrough case NonCollectionStream: // Let default behavior be legacy stream type fallthrough default: event.Key = rq.Key } } func (event *UprEvent) String() string { name := gomemcached.CommandNames[event.Opcode] if name == "" { name = fmt.Sprintf("#%d", event.Opcode) } return name } func (event *UprEvent) IsSnappyDataType() bool { return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0) } func (event *UprEvent) IsCollectionType() bool { return event.IsSystemEvent() || event.CollectionId <= math.MaxUint32 } func (event *UprEvent) IsSystemEvent() bool { return event.Opcode == gomemcached.DCP_SYSTEM_EVENT } func (event *UprEvent) PopulateEvent(extras []byte) { if len(extras) < dcpSystemEventExtraLen { // Wrong length, don't parse return } event.Seqno = binary.BigEndian.Uint64(extras[:8]) event.SystemEvent = SystemEventType(binary.BigEndian.Uint32(extras[8:12])) var versionTemp uint16 = binary.BigEndian.Uint16(extras[12:14]) event.SysEventVersion = uint8(versionTemp >> 8) } func (event *UprEvent) GetSystemEventName() (string, error) { switch event.SystemEvent { case CollectionCreate: fallthrough case ScopeCreate: return string(event.Key), nil default: return "", ErrorInvalidOp } } func (event *UprEvent) GetManifestId() (uint64, error) { switch event.SystemEvent { // Version 0 only checks case CollectionChanged: fallthrough case ScopeDrop: fallthrough case ScopeCreate: fallthrough case CollectionDrop: if event.SysEventVersion > 0 { return 0, ErrorInvalidVersion } fallthrough case CollectionCreate: // CollectionCreate supports version 1 if event.SysEventVersion > 1 { return 0, ErrorInvalidVersion } if event.ValueLen < 8 { return 0, ErrorValueTooShort } return binary.BigEndian.Uint64(event.Value[0:8]), nil default: return 0, ErrorInvalidOp } } func (event *UprEvent) GetCollectionId() (uint32, error) { switch event.SystemEvent { case CollectionDrop: if event.SysEventVersion > 0 { return 0, ErrorInvalidVersion } fallthrough case CollectionCreate: if event.SysEventVersion > 1 { return 0, ErrorInvalidVersion } if event.ValueLen < 16 { return 0, ErrorValueTooShort } return binary.BigEndian.Uint32(event.Value[12:16]), nil case CollectionChanged: if event.SysEventVersion > 0 { return 0, ErrorInvalidVersion } if event.ValueLen < 12 { return 0, ErrorValueTooShort } return binary.BigEndian.Uint32(event.Value[8:12]), nil default: return 0, ErrorInvalidOp } } func (event *UprEvent) GetScopeId() (uint32, error) { switch event.SystemEvent { // version 0 checks case ScopeCreate: fallthrough case ScopeDrop: fallthrough case CollectionDrop: if event.SysEventVersion > 0 { return 0, ErrorInvalidVersion } fallthrough case CollectionCreate: // CollectionCreate could be either 0 or 1 if event.SysEventVersion > 1 { return 0, ErrorInvalidVersion } if event.ValueLen < 12 { return 0, ErrorValueTooShort } return binary.BigEndian.Uint32(event.Value[8:12]), nil default: return 0, ErrorInvalidOp } } func (event *UprEvent) GetMaxTTL() (uint32, error) { switch event.SystemEvent { case CollectionCreate: if event.SysEventVersion < 1 { return 0, ErrorNoMaxTTL } if event.ValueLen < 20 { return 0, ErrorValueTooShort } return binary.BigEndian.Uint32(event.Value[16:20]), nil case CollectionChanged: if event.SysEventVersion > 0 { return 0, ErrorInvalidVersion } if event.ValueLen < 16 { return 0, ErrorValueTooShort } return binary.BigEndian.Uint32(event.Value[12:16]), nil default: return 0, ErrorInvalidOp } } type Uleb128 []byte func (u Uleb128) ToUint64(cachedLen int) (result uint64, bytesShifted int) { var shift uint = 0 for curByte := 0; curByte < cachedLen; curByte++ { oneByte := u[curByte] last7Bits := 0x7f & oneByte result |= uint64(last7Bits) << shift bytesShifted++ if oneByte&0x80 == 0 { break } shift += 7 } return }