diff --git a/internal/carrier/bytestream.go b/internal/carrier/bytestream.go new file mode 100644 index 0000000..e7967a4 --- /dev/null +++ b/internal/carrier/bytestream.go @@ -0,0 +1,47 @@ +package carrier + +import ( + "context" + + "github.com/pion/webrtc/v4" +) + +// ByteStream is a carrier capability for bidirectional byte transport. +type ByteStream interface { + Connect(ctx context.Context) error + Send(data []byte) error + Close() error + SetReconnectCallback(cb func()) + SetShouldReconnect(fn func() bool) + SetEndedCallback(cb func(string)) + WatchConnection(ctx context.Context) + CanSend() bool +} + +type providerByteStream struct { + carrier Carrier +} + +// OpenByteStream adapts a carrier to a generic byte-stream capability. +func OpenByteStream(c Carrier) ByteStream { + return &providerByteStream{carrier: c} +} + +func (p *providerByteStream) Connect(ctx context.Context) error { return p.carrier.Connect(ctx) } +func (p *providerByteStream) Send(data []byte) error { return p.carrier.Send(data) } +func (p *providerByteStream) Close() error { return p.carrier.Close() } + +func (p *providerByteStream) SetReconnectCallback(cb func()) { + p.carrier.SetReconnectCallback(func(_ *webrtc.DataChannel) { + if cb != nil { + cb() + } + }) +} + +func (p *providerByteStream) SetShouldReconnect(fn func() bool) { p.carrier.SetShouldReconnect(fn) } +func (p *providerByteStream) SetEndedCallback(cb func(string)) { p.carrier.SetEndedCallback(cb) } +func (p *providerByteStream) WatchConnection(ctx context.Context) { + p.carrier.WatchConnection(ctx) +} +func (p *providerByteStream) CanSend() bool { return p.carrier.CanSend() } diff --git a/internal/transport/datachannel/transport.go b/internal/transport/datachannel/transport.go index f57e6ca..3faa4ca 100644 --- a/internal/transport/datachannel/transport.go +++ b/internal/transport/datachannel/transport.go @@ -7,11 +7,10 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/transport" - "github.com/pion/webrtc/v4" ) -type providerTransport struct { - carrier carrier.Carrier +type streamTransport struct { + stream carrier.ByteStream } // New creates a datachannel transport backed by a carrier-specific provider. @@ -28,49 +27,45 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) return nil, fmt.Errorf("create provider transport: %w", err) } - return &providerTransport{carrier: c}, nil + return &streamTransport{stream: carrier.OpenByteStream(c)}, nil } // Connect starts the transport connection. -func (p *providerTransport) Connect(ctx context.Context) error { - return p.carrier.Connect(ctx) +func (p *streamTransport) Connect(ctx context.Context) error { + return p.stream.Connect(ctx) } // Send transmits data through the transport. -func (p *providerTransport) Send(data []byte) error { - return p.carrier.Send(data) +func (p *streamTransport) Send(data []byte) error { + return p.stream.Send(data) } // Close terminates the transport. -func (p *providerTransport) Close() error { - return p.carrier.Close() +func (p *streamTransport) Close() error { + return p.stream.Close() } // SetReconnectCallback registers reconnect handling. -func (p *providerTransport) SetReconnectCallback(cb func()) { - p.carrier.SetReconnectCallback(func(_ *webrtc.DataChannel) { - if cb != nil { - cb() - } - }) +func (p *streamTransport) SetReconnectCallback(cb func()) { + p.stream.SetReconnectCallback(cb) } // SetShouldReconnect configures reconnect policy. -func (p *providerTransport) SetShouldReconnect(fn func() bool) { - p.carrier.SetShouldReconnect(fn) +func (p *streamTransport) SetShouldReconnect(fn func() bool) { + p.stream.SetShouldReconnect(fn) } // SetEndedCallback registers end-of-session handling. -func (p *providerTransport) SetEndedCallback(cb func(string)) { - p.carrier.SetEndedCallback(cb) +func (p *streamTransport) SetEndedCallback(cb func(string)) { + p.stream.SetEndedCallback(cb) } // WatchConnection monitors connection lifecycle. -func (p *providerTransport) WatchConnection(ctx context.Context) { - p.carrier.WatchConnection(ctx) +func (p *streamTransport) WatchConnection(ctx context.Context) { + p.stream.WatchConnection(ctx) } // CanSend reports whether transport is ready for sending. -func (p *providerTransport) CanSend() bool { - return p.carrier.CanSend() +func (p *streamTransport) CanSend() bool { + return p.stream.CanSend() }